tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-601: Improve distinct aggregation query processing.
Date Thu, 20 Feb 2014 06:21:07 GMT
TAJO-601: Improve distinct aggregation query processing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/6053ad11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/6053ad11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/6053ad11

Branch: refs/heads/master
Commit: 6053ad11efc31ed25c6ea8be10d4ce967c34dda3
Parents: 57bf90e
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Thu Feb 20 15:20:42 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Thu Feb 20 15:20:42 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/catalog/SortSpec.java  |   2 +-
 .../main/java/org/apache/tajo/util/TUtil.java   |   6 +
 .../apache/tajo/engine/eval/EvalTreeUtil.java   |  22 +-
 .../apache/tajo/engine/eval/FunctionEval.java   |   2 +-
 .../function/builtin/SumDoubleDistinct.java     |  98 ++++++
 .../tajo/engine/function/builtin/SumFloat.java  |  12 +-
 .../function/builtin/SumFloatDistinct.java      |  98 ++++++
 .../engine/function/builtin/SumIntDistinct.java |  98 ++++++
 .../function/builtin/SumLongDistinct.java       |  98 ++++++
 .../tajo/engine/planner/ExprsVerifier.java      |   2 +-
 .../tajo/engine/planner/LogicalPlanner.java     |  16 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  19 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |  17 +-
 .../engine/planner/PreLogicalPlanVerifier.java  |  44 +++
 .../tajo/engine/planner/enforce/Enforcer.java   |  66 ++++
 .../tajo/engine/planner/global/DataChannel.java |  11 +-
 .../engine/planner/global/GlobalPlanner.java    | 317 ++++++++++++++++---
 .../tajo/engine/planner/global/MasterPlan.java  |  14 +-
 .../planner/physical/ExternalSortExec.java      |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   4 +-
 .../planner/rewrite/FilterPushDownRule.java     |   4 +-
 .../rewrite/PartitionedTableRewriter.java       |   8 +-
 .../planner/rewrite/ProjectionPushDownRule.java |   4 +-
 .../tajo/master/querymaster/QueryMaster.java    |   2 +-
 .../querymaster/QueryMasterManagerService.java  |   2 +-
 .../tajo/master/querymaster/Repartitioner.java  |   5 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |  12 +-
 .../tajo/engine/query/TestGroupByQuery.java     |  48 ++-
 .../tajo/master/TestExecutionBlockCursor.java   |   2 +-
 .../apache/tajo/master/TestGlobalPlanner.java   |   4 +-
 .../TestGroupByQuery/testCountDistinct.sql      |   1 -
 .../TestGroupByQuery/testCountDistinct2.sql     |   1 -
 .../testDistinctAggregation1.sql                |   1 +
 .../testDistinctAggregation2.sql                |   1 +
 .../testDistinctAggregation3.sql                |   1 +
 .../testDistinctAggregation4.sql                |   1 +
 .../testDistinctAggregation5.sql                |   1 +
 .../testDistinctAggregationWithHaving1.sql      |   1 +
 .../testDistinctAggregationWithUnion1.sql       |  21 ++
 .../TestGroupByQuery/testCountDistinct.result   |   5 -
 .../TestGroupByQuery/testCountDistinct2.result  |   5 -
 .../testDistinctAggregation1.result             |   5 +
 .../testDistinctAggregation2.result             |   5 +
 .../testDistinctAggregation3.result             |   3 +
 .../testDistinctAggregation4.result             |   4 +
 .../testDistinctAggregation5.result             |   4 +
 .../testDistinctAggregationWithHaving1.result   |   3 +
 .../testDistinctAggregationWithUnion1.result    |   4 +
 .../java/org/apache/tajo/storage/RawFile.java   |   2 +-
 50 files changed, 971 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f472c87..12217d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-601: Improve distinct aggregation query processing.
+
     TAJO-305: Implement killQuery feature. (hyunsik)
 
     TAJO-598: Refactoring Tajo RPC. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
index 3ef73d5..a2c76ba 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/SortSpec.java
@@ -108,7 +108,7 @@ public class SortSpec implements Cloneable, GsonObject, ProtoObject<SortSpecProt
   }
 
   public String toString() {
-    return "Sortkey (key="+sortKey + " "+(ascending ? "asc" : "desc")+")";
+    return sortKey + " ("+(ascending ? "asc" : "desc")+")";
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index cc694d4..310d187 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.util;
 
 import com.google.common.base.Objects;
 
+import java.lang.reflect.Array;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -198,4 +199,9 @@ public class TUtil {
 
     return sb.toString();
   }
+
+  public static <T> T [] toArray(Collection<T> collection, Class<T> type) {
+    T array = (T) Array.newInstance(type, collection.size());
+    return collection.toArray((T[]) array);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index da05739..5f1035d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -67,9 +67,12 @@ public class EvalTreeUtil {
       return evalNode;
     }
   }
-  
-  public static Set<Column> findDistinctRefColumns(EvalNode node) {
-    DistinctColumnRefFinder finder = new DistinctColumnRefFinder();
+
+  /**
+   * It finds unique columns from a EvalNode.
+   */
+  public static LinkedHashSet<Column> findUniqueColumns(EvalNode node) {
+    UniqueColumnFinder finder = new UniqueColumnFinder();
     node.postOrder(finder);
     return finder.getColumnRefs();
   }
@@ -79,9 +82,6 @@ public class EvalTreeUtil {
     node.postOrder(finder);
     return finder.getColumnRefs();
   }
-
-
-
   
   public static Schema getSchemaByTargets(Schema inputSchema, Target [] targets) 
       throws InternalException {
@@ -236,20 +236,20 @@ public class EvalTreeUtil {
     }
   }
   
-  public static class DistinctColumnRefFinder implements EvalNodeVisitor {
-    private Set<Column> colList = new HashSet<Column>(); 
+  public static class UniqueColumnFinder implements EvalNodeVisitor {
+    private LinkedHashSet<Column> columnSet = Sets.newLinkedHashSet();
     private FieldEval field = null;
     
     @Override
     public void visit(EvalNode node) {
       if (node.getType() == EvalType.FIELD) {
         field = (FieldEval) node;
-        colList.add(field.getColumnRef());
+        columnSet.add(field.getColumnRef());
       }
     }
     
-    public Set<Column> getColumnRefs() {
-      return this.colList;
+    public LinkedHashSet<Column> getColumnRefs() {
+      return this.columnSet;
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
index 0555bde..de982e5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
@@ -93,7 +93,7 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
 			if(i+1 < argEvals.length)
 				sb.append(",");
 		}
-		return funcDesc.getSignature()+"("+sb+")";
+		return funcDesc.getSignature() + "(" + (isDistinct() ? " distinct" : "") + sb+")";
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
new file mode 100644
index 0000000..9b5b190
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumDoubleDistinct.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Float8Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Function definition
+ *
+ * FLOAT8 sum(distinct value FLOAT8)
+ */
+@Description(
+  functionName = "sum",
+  description = "the sum of a distinct and non-null values",
+  example = "> SELECT sum(distinct expr);",
+  returnType = Type.FLOAT8,
+  paramTypes = {@ParamTypes(paramTypes = {Type.FLOAT8})}
+)
+public class SumDoubleDistinct extends AggFunction<Datum> {
+
+  public SumDoubleDistinct() {
+    super(new Column[] {
+        new Column("expr", Type.FLOAT8)
+    });
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new SumContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+  }
+
+  @Override
+  public void merge(FunctionContext context, Tuple params) {
+    SumContext distinctContext = (SumContext) context;
+    Datum value = params.get(0);
+    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+      distinctContext.latest = value;
+      distinctContext.sum += value.asFloat8();
+    }
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext ctx) {
+    return DatumFactory.createFloat8(((SumContext) ctx).sum);
+  }
+
+  @Override
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT8);
+  }
+
+  @Override
+  public Float8Datum terminate(FunctionContext ctx) {
+    return DatumFactory.createFloat8(((SumContext) ctx).sum);
+  }
+
+  private class SumContext implements FunctionContext {
+    double sum;
+    Datum latest;
+  }
+
+  public CatalogProtos.FunctionType getFunctionType() {
+    return CatalogProtos.FunctionType.DISTINCT_AGGREGATION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
index 10fd720..72c4a6b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloat.java
@@ -33,13 +33,13 @@ import org.apache.tajo.storage.Tuple;
 /**
  * Function definition
  *
- * FLOAT4 sum(value FLOAT4)
+ * FLOAT8 sum(value FLOAT4)
  */
 @Description(
   functionName = "sum",
   description = "the sum of a set of numbers",
   example = "> SELECT sum(expr);",
-  returnType = Type.FLOAT4,
+  returnType = Type.FLOAT8,
   paramTypes = {@ParamTypes(paramTypes = {Type.FLOAT4})}
 )
 public class SumFloat extends AggFunction<Datum> {
@@ -61,20 +61,20 @@ public class SumFloat extends AggFunction<Datum> {
 
   @Override
   public Datum getPartialResult(FunctionContext ctx) {
-    return DatumFactory.createFloat4(((SumContext) ctx).sum);
+    return DatumFactory.createFloat8(((SumContext) ctx).sum);
   }
 
   @Override
   public DataType getPartialResultType() {
-    return CatalogUtil.newSimpleDataType(Type.FLOAT4);
+    return CatalogUtil.newSimpleDataType(Type.FLOAT8);
   }
 
   @Override
   public Datum terminate(FunctionContext ctx) {
-    return DatumFactory.createFloat4(((SumContext) ctx).sum);
+    return DatumFactory.createFloat8(((SumContext) ctx).sum);
   }
 
   private class SumContext implements FunctionContext {
-    private float sum;
+    private double sum;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
new file mode 100644
index 0000000..f230622
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumFloatDistinct.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Float8Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Function definition
+ *
+ * FLOAT8 sum(distinct value FLOAT4)
+ */
+@Description(
+  functionName = "sum",
+  description = "the sum of a distinct and non-null values",
+  example = "> SELECT sum(distinct expr);",
+  returnType = Type.FLOAT8,
+  paramTypes = {@ParamTypes(paramTypes = {Type.FLOAT4})}
+)
+public class SumFloatDistinct extends AggFunction<Datum> {
+
+  public SumFloatDistinct() {
+    super(new Column[] {
+        new Column("expr", Type.FLOAT4)
+    });
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new SumContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+  }
+
+  @Override
+  public void merge(FunctionContext context, Tuple params) {
+    SumContext distinctContext = (SumContext) context;
+    Datum value = params.get(0);
+    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+      distinctContext.latest = value;
+      distinctContext.sum += value.asFloat4();
+    }
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext ctx) {
+    return DatumFactory.createFloat8(((SumContext) ctx).sum);
+  }
+
+  @Override
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.FLOAT8);
+  }
+
+  @Override
+  public Float8Datum terminate(FunctionContext ctx) {
+    return DatumFactory.createFloat8(((SumContext) ctx).sum);
+  }
+
+  private class SumContext implements FunctionContext {
+    double sum;
+    Datum latest;
+  }
+
+  public CatalogProtos.FunctionType getFunctionType() {
+    return CatalogProtos.FunctionType.DISTINCT_AGGREGATION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
new file mode 100644
index 0000000..e1f2176
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumIntDistinct.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Int8Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Function definition
+ *
+ * INT8 sum(distinct value INT4)
+ */
+@Description(
+  functionName = "sum",
+  description = "the sum of a distinct and non-null values",
+  example = "> SELECT sum(distinct expr);",
+  returnType = Type.INT8,
+  paramTypes = {@ParamTypes(paramTypes = {Type.INT4})}
+)
+public class SumIntDistinct extends AggFunction<Datum> {
+
+  public SumIntDistinct() {
+    super(new Column[] {
+        new Column("expr", Type.INT4)
+    });
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new SumContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+  }
+
+  @Override
+  public void merge(FunctionContext context, Tuple params) {
+    SumContext distinctContext = (SumContext) context;
+    Datum value = params.get(0);
+    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+      distinctContext.latest = value;
+      distinctContext.sum += value.asInt4();
+    }
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext ctx) {
+    return DatumFactory.createInt8(((SumContext) ctx).sum);
+  }
+
+  @Override
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT8);
+  }
+
+  @Override
+  public Int8Datum terminate(FunctionContext ctx) {
+    return DatumFactory.createInt8(((SumContext) ctx).sum);
+  }
+
+  private class SumContext implements FunctionContext {
+    long sum;
+    Datum latest;
+  }
+
+  public CatalogProtos.FunctionType getFunctionType() {
+    return CatalogProtos.FunctionType.DISTINCT_AGGREGATION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
new file mode 100644
index 0000000..d899c37
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/SumLongDistinct.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Int8Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Function definition
+ *
+ * INT8 sum(distinct value INT8)
+ */
+@Description(
+  functionName = "sum",
+  description = "the sum of a distinct and non-null values",
+  example = "> SELECT sum(distinct expr);",
+  returnType = Type.INT8,
+  paramTypes = {@ParamTypes(paramTypes = {Type.INT8})}
+)
+public class SumLongDistinct extends AggFunction<Datum> {
+
+  public SumLongDistinct() {
+    super(new Column[] {
+        new Column("expr", Type.INT8)
+    });
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new SumContext();
+  }
+
+  @Override
+  public void eval(FunctionContext ctx, Tuple params) {
+  }
+
+  @Override
+  public void merge(FunctionContext context, Tuple params) {
+    SumContext distinctContext = (SumContext) context;
+    Datum value = params.get(0);
+    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+      distinctContext.latest = value;
+      distinctContext.sum += value.asInt8();
+    }
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext ctx) {
+    return DatumFactory.createInt8(((SumContext) ctx).sum);
+  }
+
+  @Override
+  public DataType getPartialResultType() {
+    return CatalogUtil.newSimpleDataType(Type.INT8);
+  }
+
+  @Override
+  public Int8Datum terminate(FunctionContext ctx) {
+    return DatumFactory.createInt8(((SumContext) ctx).sum);
+  }
+
+  private class SumContext implements FunctionContext {
+    long sum;
+    Datum latest;
+  }
+
+  public CatalogProtos.FunctionType getFunctionType() {
+    return CatalogProtos.FunctionType.DISTINCT_AGGREGATION;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
index b14c448..87ced21 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
@@ -45,7 +45,7 @@ public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalN
   public static VerificationState verify(VerificationState state, LogicalNode currentNode, EvalNode expression)
       throws PlanningException {
     instance.visitChild(state, expression, new Stack<EvalNode>());
-    Set<Column> referredColumns = EvalTreeUtil.findDistinctRefColumns(expression);
+    Set<Column> referredColumns = EvalTreeUtil.findUniqueColumns(expression);
     for (Column referredColumn : referredColumns) {
       if (!currentNode.getInSchema().contains(referredColumn)) {
         throw new PlanningException("Invalid State: " + referredColumn + " cannot be referred at Node ("

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index f7c0bfa..e5e84a7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -352,7 +352,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   public static void verifyProjectedFields(QueryBlock block, Projectable projectable) throws PlanningException {
     if (projectable instanceof ProjectionNode && block.hasNode(NodeType.GROUP_BY)) {
       for (Target target : projectable.getTargets()) {
-        Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
         for (Column c : columns) {
           if (!projectable.getInSchema().contains(c)) {
             throw new PlanningException(c.getQualifiedName()
@@ -371,7 +371,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       }
       if (groupbyNode.hasAggFunctions()) {
         for (AggregationFunctionCallEval f : groupbyNode.getAggFunctions()) {
-          Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(f);
+          Set<Column> columns = EvalTreeUtil.findUniqueColumns(f);
           for (Column c : columns) {
             if (!projectable.getInSchema().contains(c)) {
               throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
@@ -383,7 +383,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     } else if (projectable instanceof RelationNode) {
       RelationNode relationNode = (RelationNode) projectable;
       for (Target target : projectable.getTargets()) {
-        Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
         for (Column c : columns) {
           if (!relationNode.getTableSchema().contains(c)) {
             throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
@@ -393,7 +393,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       }
     } else {
       for (Target target : projectable.getTargets()) {
-        Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+        Set<Column> columns = EvalTreeUtil.findUniqueColumns(target.getEvalTree());
         for (Column c : columns) {
           if (!projectable.getInSchema().contains(c)) {
             throw new PlanningException(String.format("Cannot get the field \"%s\" at node (%d)",
@@ -1433,7 +1433,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   ===============================================================================================*/
 
   public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode node) {
-    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
 
     if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
       return false;
@@ -1444,7 +1444,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
   public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode node,
                                                  boolean isTopMostJoin) {
-    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
 
     if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
       return false;
@@ -1480,7 +1480,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * It checks if evalNode can be evaluated at this @{link RelationNode}.
    */
   public static boolean checkIfBeEvaluatedAtRelation(QueryBlock block, EvalNode evalNode, RelationNode node) {
-    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
 
     // aggregation functions cannot be evaluated in scan node
     if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
@@ -1503,7 +1503,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   public static boolean checkIfBeEvaluatedAtThis(EvalNode evalNode, LogicalNode node) {
-    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
     if (columnRefs.size() > 0 && !node.getInSchema().containsAll(columnRefs)) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 67f5630..7ba2889 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -23,6 +23,7 @@ package org.apache.tajo.engine.planner;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.ObjectArrays;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -909,21 +910,29 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
     if (property != null) {
       List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
-      SortSpec[] enforcedSortSpecs = new SortSpec[sortSpecProtos.size()];
+
+      List<SortSpec> enforcedSortSpecList = Lists.newArrayList();
       int i = 0;
+      outer:
+      for (int j = 0; j < sortSpecProtos.size(); j++) {
+        SortSpec enforcedSortSpecs = new SortSpec(sortSpecProtos.get(j));
+
+        for (Column grpKey : grpColumns) { // if this sort key is included in grouping columns, skip it.
+          if (enforcedSortSpecs.getSortKey().equals(grpKey)) {
+            continue outer;
+          }
+        }
 
-      for (int j = 0; j < sortSpecProtos.size(); i++, j++) {
-        enforcedSortSpecs[i] = new SortSpec(sortSpecProtos.get(j));
+        enforcedSortSpecList.add(enforcedSortSpecs);
       }
 
-      sortSpecs = ObjectArrays.concat(sortSpecs, enforcedSortSpecs, SortSpec.class);
+      sortSpecs = ObjectArrays.concat(sortSpecs, TUtil.toArray(enforcedSortSpecList, SortSpec.class), SortSpec.class);
     }
 
     SortNode sortNode = new SortNode(-1);
     sortNode.setSortSpecs(sortSpecs);
     sortNode.setInSchema(subOp.getSchema());
     sortNode.setOutSchema(subOp.getSchema());
-    // SortExec sortExec = new SortExec(sortNode, child);
     ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
     LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
     return new SortAggregateExec(ctx, groupbyNode, sortExec);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 7d5e2fc..8017331 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -29,7 +29,6 @@ import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
@@ -336,7 +335,7 @@ public class PlannerUtil {
   }
 
   public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {
-    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
+    Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(eval);
 
     if (node.getType() == NodeType.JOIN) {
       JoinNode joinNode = (JoinNode) node;
@@ -710,20 +709,6 @@ public class PlannerUtil {
     return joinType == JoinType.INNER;
   }
 
-  public static Schema rewriteColumnPartitionedTableSchema(
-                               PartitionMethodDesc partitionDesc,
-                               Schema columnPartitionSchema,
-                               Schema sourceSchema,
-                               String qualifier) {
-    Schema schema = new Schema();
-    for (Column column : sourceSchema.toArray()) {
-      if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
-        schema.addColumn(column);
-      }
-    }
-    return schema;
-  }
-
   public static boolean existsAggregationFunction(Expr expr) throws PlanningException {
     AggregationFunctionFinder finder = new AggregationFunctionFinder();
     AggFunctionFoundResult result = new AggFunctionFoundResult();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 6dac031..024a9ae 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -18,10 +18,12 @@
 
 package org.apache.tajo.engine.planner;
 
+import com.google.common.collect.ObjectArrays;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.util.TUtil;
 
+import java.util.Arrays;
 import java.util.Set;
 import java.util.Stack;
 
@@ -33,8 +35,13 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
   }
 
   public Expr visitProjection(VerificationState state, Stack<Expr> stack, Projection expr) throws PlanningException {
+    super.visitProjection(state, stack, expr);
+
     Set<String> names = TUtil.newHashSet();
+    Expr [] distinctValues = null;
+
     for (NamedExpr namedExpr : expr.getNamedExprs()) {
+
       if (namedExpr.hasAlias()) {
         if (names.contains(namedExpr.getAlias())) {
           state.addVerification(String.format("column name \"%s\" specified more than once", namedExpr.getAlias()));
@@ -42,6 +49,43 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
           names.add(namedExpr.getAlias());
         }
       }
+
+      // no two aggregations can have different DISTINCT columns.
+      //
+      // For example, the following query will work
+      // SELECT count(DISTINCT col1) and sum(DISTINCT col1) ..
+      //
+      // But, the following query will not work in this time
+      //
+      // SELECT count(DISTINCT col1) and SUM(DISTINCT col2) ..
+      Set<GeneralSetFunctionExpr> exprs = ExprFinder.finds(namedExpr.getExpr(), OpType.GeneralSetFunction);
+      if (exprs.size() > 0) {
+        for (GeneralSetFunctionExpr setFunction : exprs) {
+          if (distinctValues == null && setFunction.isDistinct()) {
+            distinctValues = setFunction.getParams();
+          } else if (distinctValues != null) {
+            if (!Arrays.equals(distinctValues, setFunction.getParams())) {
+              Expr [] differences = ObjectArrays.concat(distinctValues, setFunction.getParams(), Expr.class);
+              throw new PlanningException("different DISTINCT columns are not supported yet: "
+                  + TUtil.arrayToString(differences));
+            }
+          }
+        }
+      }
+
+      // Currently, avg functions with distinct aggregation are not supported.
+      // This code does not allow users to use avg functions with distinct aggregation.
+      if (distinctValues != null) {
+        for (GeneralSetFunctionExpr setFunction : exprs) {
+          if (setFunction.getSignature().equalsIgnoreCase("avg")) {
+            if (setFunction.isDistinct()) {
+              throw new PlanningException("avg(distinct) function is not supported yet.");
+            } else {
+              throw new PlanningException("avg() function with distinct aggregation functions is not supported yet.");
+            }
+          }
+        }
+      }
     }
     return expr;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index dd46996..da8cb01 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.enforce;
 
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.util.TUtil;
 
@@ -193,4 +194,69 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
     builder.addAllProperties(getProperties());
     return builder.build();
   }
+
+  public static String toString(EnforceProperty property) {
+    StringBuilder sb = new StringBuilder();
+    switch (property.getType()) {
+    case GROUP_BY:
+      GroupbyEnforce groupby = property.getGroupby();
+      sb.append("type=GroupBy,alg=");
+      if (groupby.getAlgorithm() == GroupbyAlgorithm.HASH_AGGREGATION) {
+        sb.append("hash");
+      } else {
+        sb.append("sort");
+        sb.append(",keys=");
+        boolean first = true;
+        for (CatalogProtos.SortSpecProto sortSpec : groupby.getSortSpecsList()) {
+          if (first == true) {
+            first = false;
+          } else {
+            sb.append(", ");
+          }
+          sb.append(sortSpec.getColumn().getColumnName());
+          sb.append(" (").append(sortSpec.getAscending() ? "asc":"desc").append(")");
+        }
+      }
+      break;
+    case BROADCAST:
+      BroadcastEnforce broadcast = property.getBroadcast();
+      sb.append("type=Broadcast, tables=").append(broadcast.getTableName());
+      break;
+    case COLUMN_PARTITION:
+      ColumnPartitionEnforcer columnPartition = property.getColumnPartition();
+      sb.append("type=ColumnPartition, alg=");
+      if (columnPartition.getAlgorithm() == ColumnPartitionAlgorithm.SORT_PARTITION) {
+        sb.append("sort");
+      } else {
+        sb.append("hash");
+      }
+      break;
+    case JOIN:
+      JoinEnforce join = property.getJoin();
+      sb.append("type=Join,alg=");
+      if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.MERGE_JOIN) {
+        sb.append("merge_join");
+      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.NESTED_LOOP_JOIN) {
+        sb.append("nested_loop");
+      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN) {
+        sb.append("block_nested_loop");
+      } else if (join.getAlgorithm() == JoinEnforce.JoinAlgorithm.IN_MEMORY_HASH_JOIN) {
+        sb.append("in_memory_hash");
+      }
+      break;
+    case OUTPUT_DISTINCT:
+    case SORT:
+      SortEnforce sort = property.getSort();
+      sb.append("type=Sort,alg=");
+      if (sort.getAlgorithm() == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) {
+        sb.append("in-memory");
+      } else {
+        sb.append("external");
+      }
+      break;
+    case SORTED_INPUT:
+    }
+
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
index 9f5c6bf..b3b5bb0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/DataChannel.java
@@ -23,6 +23,7 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.util.TUtil;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
@@ -190,15 +191,7 @@ public class DataChannel {
     sb.append(" (type=").append(shuffleType);
     if (hasShuffleKeys()) {
       sb.append(", key=");
-      boolean first = true;
-      for (Column column : getShuffleKeys()) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(",");
-        }
-        sb.append(column.getColumnName());
-      }
+      sb.append(TUtil.arrayToString(shuffleKeys));
       sb.append(", num=").append(numOutputs);
     }
     sb.append(")");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index f390b52..15aa00f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -29,12 +29,19 @@ import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.function.AggFunction;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
-import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TajoWorker;
 
 import java.io.IOException;
 import java.util.*;
@@ -48,15 +55,38 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.*;
 public class GlobalPlanner {
   private static Log LOG = LogFactory.getLog(GlobalPlanner.class);
 
-  private TajoConf conf;
-  private CatalogProtos.StoreType storeType;
+  private final TajoConf conf;
+  private final CatalogProtos.StoreType storeType;
+  private CatalogService catalog;
+  private TajoWorker.WorkerContext workerContext;
 
-  public GlobalPlanner(final TajoConf conf, final AbstractStorageManager sm) throws IOException {
+  public GlobalPlanner(final TajoConf conf, final CatalogService catalog) throws IOException {
     this.conf = conf;
+    this.catalog = catalog;
     this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
     Preconditions.checkArgument(storeType != null);
   }
 
+  public GlobalPlanner(final TajoConf conf, final TajoWorker.WorkerContext workerContext) throws IOException {
+    this.conf = conf;
+    this.workerContext = workerContext;
+    this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
+    Preconditions.checkArgument(storeType != null);
+  }
+
+  /**
+   * TODO: this is hack. it must be refactored at TAJO-602.
+   */
+  public CatalogService getCatalog() {
+    if (workerContext.getCatalog() != null) {
+      return workerContext.getCatalog();
+    } else if (catalog != null) {
+      return catalog;
+    } else {
+      throw new IllegalStateException("No Catalog Instance");
+    }
+  }
+
   public class GlobalPlanContext {
     MasterPlan plan;
     Map<Integer, ExecutionBlock> execBlockMap = Maps.newHashMap();
@@ -111,7 +141,7 @@ public class GlobalPlanner {
 
   public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
     Preconditions.checkArgument(channel.getSchema() != null,
-        "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
+        "Channel schema (" + channel.getSrcId().getId() + " -> " + channel.getTargetId().getId() + ") is not initialized");
     TableMeta meta = new TableMeta(channel.getStoreType(), new Options());
     TableDesc desc = new TableDesc(channel.getSrcId().toString(), channel.getSchema(), meta, new Path("/"));
     return new ScanNode(plan.newPID(), desc);
@@ -237,52 +267,261 @@ public class GlobalPlanner {
     return currentBlock;
   }
 
+  private AggregationFunctionCallEval createSumFunction(EvalNode [] args) throws InternalException {
+    FunctionDesc functionDesc = getCatalog().getFunction("sum", CatalogProtos.FunctionType.AGGREGATION,
+        args[0].getValueType());
+    return new AggregationFunctionCallEval(functionDesc, (AggFunction) functionDesc.newInstance(), args);
+  }
+
+  private AggregationFunctionCallEval createCountFunction(EvalNode [] args) throws InternalException {
+    FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION,
+        args[0].getValueType());
+    return new AggregationFunctionCallEval(functionDesc, (AggFunction) functionDesc.newInstance(), args);
+  }
+
+  private AggregationFunctionCallEval createCountRowFunction(EvalNode[] args) throws InternalException {
+    FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION,
+        new TajoDataTypes.DataType[]{});
+    return new AggregationFunctionCallEval(functionDesc, (AggFunction) functionDesc.newInstance(), args);
+  }
+
+  private AggregationFunctionCallEval createMaxFunction(EvalNode [] args) throws InternalException {
+    FunctionDesc functionDesc = getCatalog().getFunction("max", CatalogProtos.FunctionType.AGGREGATION,
+        args[0].getValueType());
+    return new AggregationFunctionCallEval(functionDesc, (AggFunction) functionDesc.newInstance(), args);
+  }
+
+  private AggregationFunctionCallEval createMinFunction(EvalNode [] args) throws InternalException {
+    FunctionDesc functionDesc = getCatalog().getFunction("min", CatalogProtos.FunctionType.AGGREGATION,
+        args[0].getValueType());
+    return new AggregationFunctionCallEval(functionDesc, (AggFunction) functionDesc.newInstance(), args);
+  }
+
+  /**
+   * It contains transformed functions and it related data.
+   * Each non-distinct function is transformed into two functions for both first and second stages.
+   */
+  private static class RewrittenFunctions {
+    AggregationFunctionCallEval [] firstStageEvals;
+    Target [] firstStageTargets;
+    AggregationFunctionCallEval secondStageEvals;
+
+    public RewrittenFunctions(int firstStageEvalNum) {
+      firstStageEvals = new AggregationFunctionCallEval[firstStageEvalNum];
+      firstStageTargets = new Target[firstStageEvalNum];
+    }
+  }
+
   /**
-   * If a query contains a distinct aggregation function, the query does not
-   * perform pre-aggregation in the first phase. Instead, in the fist phase,
-   * the query performs only hash shuffle. Then, the query performs the
-   * sort aggregation in the second phase. At that time, the aggregation
-   * function should be executed as the first phase.
+   * Tajo uses three execution blocks for an aggregation operator including distinct aggregations.
+   * We call this approach <i><b>three-phase aggregation</b></i>.
+   *
+   * In this case, non-distinct set functions (i.e., <code>count(1), sum(col1)</code>) should be rewritten
+   * to other forms. Please see the following example. This is a rewriting case for a query which includes distinct
+   * aggregation functions. In this example, <code>count(*)</code> functions are transformed into two
+   * functions: count(*) in the inner query and sum() in the outer query.
+   *
+   * <h2>Original query</h2>
+   * <pre>
+   * SELECT
+   *   grp1, grp2, count(*) as total, count(distinct grp3) as distinct_col
+   * from
+   *   rel1
+   * group by
+   *   grp1, grp2;
+   * </pre>
+   *
+   * <h2>Rewritten query</h2>
+   * <pre>
+   * SELECT grp1, grp2, sum(cnt) as total, count(grp3) as distinct_col from (
+   *   SELECT
+   *     grp1, grp2, grp3, count(*) as cnt
+   *   from
+   *     rel1
+   *   group by
+   *     grp1, grp2, grp3) tmp1
+   * group by
+   *   grp1, grp2
+   * ) table1;
+   * </pre>
+   *
+   * The main objective of this method is to transform non-distinct aggregation functions for three-phase aggregation.
    */
-  private ExecutionBlock buildDistinctGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
-                                              GroupbyNode groupbyNode) {
-    // setup child block
-    LogicalNode topMostOfFirstPhase = groupbyNode.getChild();
-    childBlock.setPlan(topMostOfFirstPhase);
-
-    // setup current block
-    ExecutionBlock currentBlock = context.plan.newExecutionBlock();
-    LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
-    for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
-      if (function.isDistinct()) {
-        columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+  private RewrittenFunctions rewriteAggFunctionsForDistinctAggregation(GlobalPlanContext context,
+                                                                       AggregationFunctionCallEval function)
+      throws PlanningException {
+
+    LogicalPlan plan = context.plan.getLogicalPlan();
+    RewrittenFunctions rewritten = null;
+
+    try {
+      if (function.getName().equalsIgnoreCase("count")) {
+        rewritten = new RewrittenFunctions(1);
+
+        if (function.getArgs().length == 0) {
+          rewritten.firstStageEvals[0] = createCountRowFunction(function.getArgs());
+        } else {
+          rewritten.firstStageEvals[0] = createCountFunction(function.getArgs());
+        }
+        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+        rewritten.firstStageTargets[0] = new Target(fieldEval);
+        rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval});
+      } else if (function.getName().equalsIgnoreCase("sum")) {
+        rewritten = new RewrittenFunctions(1);
+
+        rewritten.firstStageEvals[0] = createSumFunction(function.getArgs());
+        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+        rewritten.firstStageTargets[0] = new Target(fieldEval);
+        rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval});
+
+      } else if (function.getName().equals("max")) {
+        rewritten = new RewrittenFunctions(1);
+
+        rewritten.firstStageEvals[0] = createMaxFunction(function.getArgs());
+        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+        rewritten.firstStageTargets[0] = new Target(fieldEval);
+        rewritten.secondStageEvals = createMaxFunction(new EvalNode[]{fieldEval});
+
+      } else if (function.getName().equals("min")) {
+
+        rewritten = new RewrittenFunctions(1);
+
+        rewritten.firstStageEvals[0] = createMinFunction(function.getArgs());
+        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+        rewritten.firstStageTargets[0] = new Target(fieldEval);
+        rewritten.secondStageEvals = createMinFunction(new EvalNode[]{fieldEval});
+
       } else {
-        // See the comment of this method. the aggregation function should be executed as the first phase.
-        function.setFirstPhase();
+        throw new PlanningException("Cannot support a mix of other functions");
       }
+    } catch (InternalException e) {
+      LOG.error(e);
     }
 
-    // Set sort aggregation enforcer to the second groupby node
-    Set<Column> existingColumns = Sets.newHashSet(groupbyNode.getGroupingColumns());
-    columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
-    SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
-    currentBlock.getEnforcer().enforceSortAggregation(groupbyNode.getPID(), sortSpecs);
+    return rewritten;
+  }
+
+  /**
+   * If there are at least one distinct aggregation function, a query works as if the query is rewritten as follows:
+   *
+   * <h2>Original query</h2>
+   * <pre>
+   * SELECT
+   *   grp1, grp2, count(*) as total, count(distinct grp3) as distinct_col
+   * from
+   *   rel1
+   * group by
+   *   grp1, grp2;
+   * </pre>
+   *
+   * The query will work as if the query is rewritten into two queries as follows:
+   *
+   * <h2>Rewritten query</h2>
+   * <pre>
+   * SELECT grp1, grp2, sum(cnt) as total, count(grp3) as distinct_col from (
+   *   SELECT
+   *     grp1, grp2, grp3, count(*) as cnt
+   *   from
+   *     rel1
+   *   group by
+   *     grp1, grp2, grp3) tmp1
+   * group by
+   *   grp1, grp2
+   * ) table1;
+   * </pre>
+   *
+   * In more detail, the first aggregation aggregates not only original grouping fields but also distinct columns.
+   * Non-distinct aggregation functions should be transformed to proper functions.
+   * Then, the second aggregation aggregates only original grouping fields with distinct aggregation functions and
+   * transformed non-distinct aggregation functions.
+   *
+   * As a result, although a no-distinct aggregation requires two stages, a distinct aggregation requires three
+   * execution blocks.
+   */
+  private ExecutionBlock buildGroupByIncludingDistinctFunctions(GlobalPlanContext context,
+                                                                ExecutionBlock latestExecBlock,
+                                                                GroupbyNode groupbyNode) throws PlanningException {
+
+    Column [] originalGroupingColumns = groupbyNode.getGroupingColumns();
+    LinkedHashSet<Column> firstStageGroupingColumns =
+        Sets.newLinkedHashSet(Arrays.asList(groupbyNode.getGroupingColumns()));
+    List<AggregationFunctionCallEval> firstStageAggFunctions = Lists.newArrayList();
+    List<AggregationFunctionCallEval> secondPhaseEvalNodes = Lists.newArrayList();
+    List<Target> firstPhaseEvalNodeTargets = Lists.newArrayList();
+
+    for (AggregationFunctionCallEval aggFunction : groupbyNode.getAggFunctions()) {
 
+      if (aggFunction.isDistinct()) {
+        // add distinct columns to first stage's grouping columns
+        firstStageGroupingColumns.addAll(EvalTreeUtil.findUniqueColumns(aggFunction));
+        // keep distinct aggregation functions for the second stage
+        secondPhaseEvalNodes.add(aggFunction);
 
-    // setup channel
+      } else {
+        // Rewrite non-distinct aggregation functions
+        RewrittenFunctions rewritten = rewriteAggFunctionsForDistinctAggregation(context, aggFunction);
+        firstStageAggFunctions.addAll(Lists.newArrayList(rewritten.firstStageEvals));
+        firstPhaseEvalNodeTargets.addAll(Lists.newArrayList(rewritten.firstStageTargets));
+
+        // keep rewritten non-aggregation functions for the second stage
+        secondPhaseEvalNodes.add(rewritten.secondStageEvals);
+      }
+    }
+
+    int firstStageAggFunctionNum = firstStageAggFunctions.size();
+    int firstStageGroupingKeyNum = firstStageGroupingColumns.size();
+
+    int i = 0;
+    Target [] firstStageTargets = new Target[firstStageGroupingKeyNum + firstStageAggFunctionNum];
+    for (Column column : firstStageGroupingColumns) {
+      Target target = new Target(new FieldEval(column));
+      firstStageTargets[i++] = target;
+    }
+    for (Target target : firstPhaseEvalNodeTargets) {
+      firstStageTargets[i++] = target;
+    }
+    // Create the groupby node for the first stage and set all necessary descriptions
+    GroupbyNode firstStageGroupby = new GroupbyNode(context.plan.getLogicalPlan().newPID());
+    firstStageGroupby.setGroupingColumns(TUtil.toArray(firstStageGroupingColumns, Column.class));
+    firstStageGroupby.setAggFunctions(TUtil.toArray(firstStageAggFunctions, AggregationFunctionCallEval.class));
+    firstStageGroupby.setTargets(firstStageTargets);
+    firstStageGroupby.setChild(groupbyNode.getChild());
+    firstStageGroupby.setInSchema(groupbyNode.getInSchema());
+
+    // Makes two execution blocks for the first stage
+    ExecutionBlock firstStage = buildGroupBy(context, latestExecBlock, firstStageGroupby);
+
+    // Create the groupby node for the second stage.
+    GroupbyNode secondPhaseGroupby = new GroupbyNode(context.plan.getLogicalPlan().newPID());
+    secondPhaseGroupby.setGroupingColumns(originalGroupingColumns);
+    secondPhaseGroupby.setAggFunctions(TUtil.toArray(secondPhaseEvalNodes, AggregationFunctionCallEval.class));
+    secondPhaseGroupby.setTargets(groupbyNode.getTargets());
+
+    ExecutionBlock secondStage = context.plan.newExecutionBlock();
+    secondStage.setPlan(secondPhaseGroupby);
+    SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(firstStageGroupingColumns);
+    secondStage.getEnforcer().enforceSortAggregation(secondPhaseGroupby.getPID(), sortSpecs);
+
+    // Create a data channel between the first and second stages
     DataChannel channel;
-    channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
-    channel.setShuffleKeys(groupbyNode.getGroupingColumns());
-    channel.setSchema(topMostOfFirstPhase.getOutSchema());
+    channel = new DataChannel(firstStage, secondStage, HASH_SHUFFLE, 32);
+    channel.setShuffleKeys(secondPhaseGroupby.getGroupingColumns().clone());
+    channel.setSchema(firstStage.getPlan().getOutSchema());
     channel.setStoreType(storeType);
 
-    // setup current block with channel
+    // Setting for the second phase's logical plan
     ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), channel);
-    groupbyNode.setChild(scanNode);
-    currentBlock.setPlan(groupbyNode);
+    secondPhaseGroupby.setChild(scanNode);
+    secondPhaseGroupby.setInSchema(scanNode.getOutSchema());
+    secondStage.setPlan(secondPhaseGroupby);
+
     context.plan.addConnect(channel);
 
-    return currentBlock;
+    return secondStage;
   }
 
   private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
@@ -292,7 +531,7 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock;
 
     if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
-      return buildDistinctGroupBy(context, lastBlock, groupbyNode);
+      return buildGroupByIncludingDistinctFunctions(context, lastBlock, groupbyNode);
     } else {
       GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 91f658d..e72ba05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -24,6 +24,7 @@ package org.apache.tajo.engine.planner.global;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -235,7 +236,7 @@ public class MasterPlan {
       }
 
       if (block.getBroadcastTables().size() > 0) {
-        sb.append("\nBroadcasted Tables: ").append(TUtil.collectionToString(block.getBroadcastTables()));
+        sb.append("[Broadcasted Tables]: ").append(TUtil.collectionToString(block.getBroadcastTables()));
         sb.append("\n");
       }
 
@@ -253,6 +254,17 @@ public class MasterPlan {
           sb.append("\n");
         }
       }
+
+      if (block.getEnforcer().getProperties().size() > 0) {
+        sb.append("\n[Enforcers]\n");
+        int i = 0;
+        for (TajoWorkerProtocol.EnforceProperty enforce : block.getEnforcer().getProperties()) {
+          sb.append(" ").append(i++).append(": ");
+          sb.append(Enforcer.toString(enforce));
+          sb.append("\n");
+        }
+      }
+
       sb.append("\n").append(block.getPlan());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 791781e..2dfbef4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -545,7 +545,7 @@ public class ExternalSortExec extends SortExec {
           outTuple = rightTuple;
           rightTuple = rightScan.next();
         }
-        return new VTuple(outTuple);
+        return outTuple;
       }
 
       if (leftTuple == null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 53a1c24..c495470 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -125,11 +125,11 @@ public class SeqScanExec extends PhysicalExec {
       Set<Column> columnSet = new HashSet<Column>();
 
       if (plan.hasQual()) {
-        columnSet.addAll(EvalTreeUtil.findDistinctRefColumns(qual));
+        columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
       }
 
       for (Target t : plan.getTargets()) {
-        columnSet.addAll(EvalTreeUtil.findDistinctRefColumns(t.getEvalTree()));
+        columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
       }
 
       for (Column column : inSchema.getColumns()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 399903c..f1daec9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -23,9 +23,9 @@ import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;
@@ -158,7 +158,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
          List<EvalNode> matched2 = Lists.newArrayList();
          for (EvalNode eval : cnf) {
 
-            Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
+            Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(eval);
             Set<String> tableNames = Sets.newHashSet();
             // getting distinct table references
             for (Column col : columnRefs) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index e5f7fb4..faef37d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -188,7 +188,7 @@ public class PartitionedTableRewriter implements RewriteRule {
       target = partitionColumns.getColumn(i);
 
       for (EvalNode expr : conjunctiveForms) {
-        if (EvalTreeUtil.findDistinctRefColumns(expr).contains(target)) {
+        if (EvalTreeUtil.findUniqueColumns(expr).contains(target)) {
           // Accumulate one qual per level
           accumulatedFilters.add(expr);
         }
@@ -281,7 +281,7 @@ public class PartitionedTableRewriter implements RewriteRule {
 
   private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
     if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
-      Set<Column> variables = EvalTreeUtil.findDistinctRefColumns(evalNode);
+      Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
       // if it contains only single variable matched to a target column
       return variables.size() == 1 && variables.contains(targetColumn);
     } else {
@@ -314,8 +314,8 @@ public class PartitionedTableRewriter implements RewriteRule {
               checkIfIndexablePredicate(evalNode.getRightExpr());
 
       boolean sameVariable =
-          EvalTreeUtil.findDistinctRefColumns(evalNode.getLeftExpr())
-          .equals(EvalTreeUtil.findDistinctRefColumns(evalNode.getRightExpr()));
+          EvalTreeUtil.findUniqueColumns(evalNode.getLeftExpr())
+          .equals(EvalTreeUtil.findUniqueColumns(evalNode.getRightExpr()));
 
       return indexable && sameVariable;
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 633d0c1..4f3c6d4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -186,7 +186,7 @@ public class ProjectionPushDownRule extends
       TUtil.putToNestedList(idToNamesMap, refId, specifiedName);
       evaluationStateMap.put(specifiedName, false);
 
-      for (Column column : EvalTreeUtil.findDistinctRefColumns(evalNode)) {
+      for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
         add(new FieldEval(column));
       }
 
@@ -365,7 +365,7 @@ public class ProjectionPushDownRule extends
     }
 
     private void addNecessaryReferences(EvalNode evalNode) {
-      for (Column column : EvalTreeUtil.findDistinctRefColumns(evalNode)) {
+      for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
         requiredSet.add(column.getQualifiedName());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index ae6d5eb..8a8772d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -111,7 +111,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
       this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
 
-      globalPlanner = new GlobalPlanner(systemConf, storageManager);
+      globalPlanner = new GlobalPlanner(systemConf, workerContext);
 
       dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 3c30e38..70d40e7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -126,7 +126,7 @@ public class QueryMasterManagerService extends CompositeService
       ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
       QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
 
-      if(queryMasterTask.isStopped()) {
+      if(queryMasterTask == null || queryMasterTask.isStopped()) {
         done.run(LazyTaskScheduler.stopTaskRunnerReq);
       } else {
         ContainerId cid =

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 56c2679..33e46fd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -557,7 +557,7 @@ public class Repartitioner {
 
   public static SubQuery setShuffleOutputNumForTwoPhase(SubQuery subQuery, final int desiredNum, DataChannel channel) {
     ExecutionBlock execBlock = subQuery.getBlock();
-    Column[] keys = null;
+    Column[] keys;
     // if the next query is join,
     // set the partition number for the current logicalUnit
     // TODO: the union handling is required when a join has unions as its child
@@ -574,8 +574,7 @@ public class Repartitioner {
     // set the partition number for group by and sort
     if (channel.getShuffleType() == HASH_SHUFFLE) {
       if (execBlock.getPlan().getType() == NodeType.GROUP_BY) {
-        GroupbyNode groupby = (GroupbyNode) execBlock.getPlan();
-        keys = groupby.getGroupingColumns();
+        keys = channel.getShuffleKeys();
       }
     } else if (channel.getShuffleType() == RANGE_SHUFFLE) {
       if (execBlock.getPlan().getType() == NodeType.SORT) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index d756242..6038124 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -151,20 +151,20 @@ public class TestEvalTreeUtil {
   public final void testChangeColumnRef() throws CloneNotSupportedException {
     EvalNode copy = (EvalNode)expr1.clone();
     EvalTreeUtil.changeColumnRef(copy, "people.score", "newscore");
-    Set<Column> set = EvalTreeUtil.findDistinctRefColumns(copy);
+    Set<Column> set = EvalTreeUtil.findUniqueColumns(copy);
     assertEquals(1, set.size());
     assertTrue(set.contains(new Column("newscore", TajoDataTypes.Type.INT4)));
 
     copy = (EvalNode)expr2.clone();
     EvalTreeUtil.changeColumnRef(copy, "people.age", "sum_age");
-    set = EvalTreeUtil.findDistinctRefColumns(copy);
+    set = EvalTreeUtil.findUniqueColumns(copy);
     assertEquals(2, set.size());
     assertTrue(set.contains(new Column("people.score", TajoDataTypes.Type.INT4)));
     assertTrue(set.contains(new Column("sum_age", TajoDataTypes.Type.INT4)));
 
     copy = (EvalNode)expr3.clone();
     EvalTreeUtil.changeColumnRef(copy, "people.age", "sum_age");
-    set = EvalTreeUtil.findDistinctRefColumns(copy);
+    set = EvalTreeUtil.findUniqueColumns(copy);
     assertEquals(2, set.size());
     assertTrue(set.contains(new Column("people.score", TajoDataTypes.Type.INT4)));
     assertTrue(set.contains(new Column("sum_age", TajoDataTypes.Type.INT4)));
@@ -172,16 +172,16 @@ public class TestEvalTreeUtil {
 
   @Test
   public final void testFindAllRefColumns() {    
-    Set<Column> set = EvalTreeUtil.findDistinctRefColumns(expr1);
+    Set<Column> set = EvalTreeUtil.findUniqueColumns(expr1);
     assertEquals(1, set.size());
     assertTrue(set.contains(new Column("people.score", TajoDataTypes.Type.INT4)));
     
-    set = EvalTreeUtil.findDistinctRefColumns(expr2);
+    set = EvalTreeUtil.findUniqueColumns(expr2);
     assertEquals(2, set.size());
     assertTrue(set.contains(new Column("people.score", TajoDataTypes.Type.INT4)));
     assertTrue(set.contains(new Column("people.age", TajoDataTypes.Type.INT4)));
     
-    set = EvalTreeUtil.findDistinctRefColumns(expr3);
+    set = EvalTreeUtil.findUniqueColumns(expr3);
     assertEquals(2, set.size());
     assertTrue(set.contains(new Column("people.score", TajoDataTypes.Type.INT4)));
     assertTrue(set.contains(new Column("people.age", TajoDataTypes.Type.INT4)));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index 1f80bce..8ecf8ed 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -125,7 +125,7 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
-  public final void testCountDistinct() throws Exception {
+  public final void testDistinctAggregation1() throws Exception {
     // select l_orderkey, max(l_orderkey) as maximum, count(distinct l_linenumber) as unique_key from lineitem
     // group by l_orderkey;
     ResultSet res = executeQuery();
@@ -137,7 +137,7 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   /**
    * This is an unit test for a combination of aggregation and distinct aggregation functions.
    */
-  public final void testCountDistinct2() throws Exception {
+  public final void testDistinctAggregation2() throws Exception {
     // select l_orderkey, count(*) as cnt, count(distinct l_linenumber) as unique_key from lineitem group by l_orderkey;
     ResultSet res = executeQuery();
     assertResultSet(res);
@@ -145,6 +145,50 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testDistinctAggregation3() throws Exception {
+    // select count(*), count(distinct l_orderkey), sum(distinct l_orderkey) from lineitem;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testDistinctAggregation4() throws Exception {
+    // select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey)
+    // from lineitem group by l_linenumber;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testDistinctAggregation5() throws Exception {
+    // select sum(distinct l_orderkey), l_linenumber, count(distinct l_orderkey), count(*) as total
+    // from lineitem group by l_linenumber;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testDistinctAggregationWithHaving1() throws Exception {
+    // select l_linenumber, count(*), count(distinct l_orderkey), sum(distinct l_orderkey) from lineitem
+    // group by l_linenumber having sum(distinct l_orderkey) >= 6;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testDistinctAggregationWithUnion1() throws Exception {
+    // select sum(distinct l_orderkey), l_linenumber, count(distinct l_orderkey), count(*) as total
+    // from (select * from lineitem union select * from lineitem) group by l_linenumber;
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
   public final void testComplexParameter() throws Exception {
     // select sum(l_extendedprice*l_discount) as revenue from lineitem;
     ResultSet res = executeQuery();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 053c028..d862e87 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -80,7 +80,7 @@ public class TestExecutionBlockCursor {
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
-    planner = new GlobalPlanner(conf, sm);
+    planner = new GlobalPlanner(conf, catalog);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index 2d3124d..4c7c9c5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -33,7 +33,6 @@ import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.junit.AfterClass;
@@ -84,8 +83,7 @@ public class TestGlobalPlanner {
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
     optimizer = new LogicalOptimizer(util.getConfiguration());
-    globalPlanner = new GlobalPlanner(util.getConfiguration(),
-        StorageManagerFactory.getStorageManager(util.getConfiguration()));
+    globalPlanner = new GlobalPlanner(util.getConfiguration(), catalog);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct.sql
deleted file mode 100644
index 6fe604e..0000000
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct.sql
+++ /dev/null
@@ -1 +0,0 @@
-select l_orderkey, max(l_orderkey) as maximum, count(distinct l_linenumber) as unique_key from lineitem group by l_orderkey;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/6053ad11/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct2.sql b/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct2.sql
deleted file mode 100644
index 6bf8a8a..0000000
--- a/tajo-core/tajo-core-backend/src/test/resources/queries/TestGroupByQuery/testCountDistinct2.sql
+++ /dev/null
@@ -1 +0,0 @@
-select l_orderkey, count(*) as cnt, count(distinct l_linenumber) as unique_key from lineitem group by l_orderkey;
\ No newline at end of file


Mime
View raw message