tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-145: count(distinct column) should be supported. (hyunsik)
Date Fri, 27 Sep 2013 16:11:39 GMT
TAJO-145: count(distinct column) should be supported. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/733192f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/733192f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/733192f3

Branch: refs/heads/master
Commit: 733192f3bfac8686055f219d4a5bad71d7948d30
Parents: f4600dd
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Sat Sep 28 01:11:09 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Sat Sep 28 01:11:09 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tajo/catalog/AbstractCatalogClient.java     |  43 ++--
 .../org/apache/tajo/catalog/CatalogService.java |  20 +-
 .../src/main/proto/CatalogProtocol.proto        |   4 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |   7 +-
 .../java/org/apache/tajo/catalog/Schema.java    |   5 +
 .../src/main/proto/CatalogProtos.proto          |  15 +-
 .../org/apache/tajo/catalog/CatalogServer.java  | 223 ++++++++++++++-----
 .../org/apache/tajo/catalog/TestCatalog.java    |  30 ++-
 .../org/apache/tajo/annotation/Nullable.java    |  25 +++
 .../main/java/org/apache/tajo/util/TUtil.java   |  16 ++
 .../org/apache/tajo/engine/eval/FuncEval.java   |   7 +
 .../engine/function/builtin/CountValue.java     |   1 +
 .../function/builtin/CountValueDistinct.java    |  74 ++++++
 .../apache/tajo/engine/planner/LogicalPlan.java |  14 +-
 .../tajo/engine/planner/LogicalPlanner.java     |  40 ++--
 .../engine/planner/PhysicalPlannerImpl.java     |  30 ++-
 .../apache/tajo/engine/planner/PlannerUtil.java | 109 ++++++---
 .../tajo/engine/planner/enforce/Enforcer.java   |  26 ++-
 .../engine/planner/logical/GroupbyNode.java     |  11 +-
 .../planner/rewrite/ProjectionPushDownRule.java |   4 +
 .../org/apache/tajo/master/GlobalPlanner.java   | 158 ++++++++-----
 .../java/org/apache/tajo/master/TajoMaster.java |   4 +-
 .../tajo/master/querymaster/Repartitioner.java  |   2 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |   1 +
 .../apache/tajo/engine/eval/TestEvalTree.java   |   4 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |   4 +-
 .../tajo/engine/function/ExprTestBase.java      |   2 +-
 .../engine/planner/TestLogicalOptimizer.java    |   4 +-
 .../tajo/engine/planner/TestLogicalPlan.java    |   2 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |   4 +-
 .../tajo/engine/planner/TestPlannerUtil.java    |   2 +-
 .../planner/physical/TestBNLJoinExec.java       |   5 +-
 .../planner/physical/TestHashJoinExec.java      |   3 +-
 .../planner/physical/TestMergeJoinExec.java     |   4 +-
 .../planner/physical/TestPhysicalPlanner.java   |  12 +-
 .../tajo/engine/query/TestGroupByQuery.java     |  63 +++++-
 .../apache/tajo/master/TestGlobalPlanner.java   |   2 +-
 38 files changed, 716 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b768f2..219ceb5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -34,6 +34,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-145: count(distinct column) should be supported. (hyunsik)
+
     TAJO-197: Implement Enforcer that forces physical planner to choose
     specified algorithms. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index ac4c796..ad085f8 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -49,8 +49,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
   @Override
   public final TableDesc getTableDesc(final String name) {
     try {
-      return CatalogUtil.newTableDesc(stub.getTableDesc(null, StringProto.newBuilder()
-          .setValue(name).build()));
+      return CatalogUtil.newTableDesc(stub.getTableDesc(null, StringProto.newBuilder().setValue(name).build()));
     } catch (ServiceException e) {
       LOG.error(e);
       return null;
@@ -201,9 +200,9 @@ public abstract class AbstractCatalogClient implements CatalogService {
   }
 
   @Override
-  public final boolean registerFunction(final FunctionDesc funcDesc) {
+  public final boolean createFunction(final FunctionDesc funcDesc) {
     try {
-      return stub.registerFunction(null, funcDesc.getProto()).getValue();
+      return stub.createFunction(null, funcDesc.getProto()).getValue();
     } catch (ServiceException e) {
       LOG.error(e);
       return false;
@@ -211,16 +210,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
   }
 
   @Override
-  public final boolean unregisterFunction(final String signature,
-                                          DataType... paramTypes) {
-    UnregisterFunctionRequest.Builder builder =
-        UnregisterFunctionRequest.newBuilder();
+  public final boolean dropFunction(final String signature) {
+    UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
     builder.setSignature(signature);
-    for (DataType type : paramTypes) {
-      builder.addParameterTypes(type);
-    }
     try {
-      return stub.unregisterFunction(null, builder.build()).getValue();
+      return stub.dropFunction(null, builder.build()).getValue();
     } catch (ServiceException e) {
       LOG.error(e);
       return false;
@@ -228,11 +222,17 @@ public abstract class AbstractCatalogClient implements CatalogService {
   }
 
   @Override
-  public final FunctionDesc getFunction(final String signature,
-                                        DataType... paramTypes) {
-    GetFunctionMetaRequest.Builder builder =
-        GetFunctionMetaRequest.newBuilder();
+  public final FunctionDesc getFunction(final String signature, DataType... paramTypes) {
+    return getFunction(signature, null, paramTypes);
+  }
+
+  @Override
+  public final FunctionDesc getFunction(final String signature, FunctionType funcType, DataType... paramTypes) {
+    GetFunctionMetaRequest.Builder builder = GetFunctionMetaRequest.newBuilder();
     builder.setSignature(signature);
+    if (funcType != null) {
+      builder.setFunctionType(funcType);
+    }
     for (DataType type : paramTypes) {
       builder.addParameterTypes(type);
     }
@@ -253,10 +253,17 @@ public abstract class AbstractCatalogClient implements CatalogService {
   }
 
   @Override
-  public final boolean containFunction(final String signature,
-                                       DataType... paramTypes) {
+  public final boolean containFunction(final String signature, DataType... paramTypes) {
+    return containFunction(signature, null, paramTypes);
+  }
+
+  @Override
+  public final boolean containFunction(final String signature, FunctionType funcType, DataType... paramTypes) {
     ContainFunctionRequest.Builder builder =
         ContainFunctionRequest.newBuilder();
+    if (funcType != null) {
+      builder.setFunctionType(funcType);
+    }
     builder.setSignature(signature);
     for (DataType type : paramTypes) {
       builder.addParameterTypes(type);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
index d0b5f50..02650bc 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -22,6 +22,8 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 
 import java.util.Collection;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
+
 public interface CatalogService {
 
   /**
@@ -76,21 +78,15 @@ public interface CatalogService {
 
   boolean deleteIndex(String indexName);
 
-  boolean registerFunction(FunctionDesc funcDesc);
+  boolean createFunction(FunctionDesc funcDesc);
 
-  boolean unregisterFunction(String signature, DataType... paramTypes);
+  boolean dropFunction(String signature);
 
-  /**
-   *
-   * @param signature
-   * @return
-   */
   FunctionDesc getFunction(String signature, DataType... paramTypes);
 
-  /**
-   *
-   * @param signature
-   * @return
-   */
+  FunctionDesc getFunction(String signature, FunctionType funcType, DataType... paramTypes);
+
   boolean containFunction(String signature, DataType... paramTypes);
+
+  boolean containFunction(String signature, FunctionType funcType, DataType... paramTypes);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
index 7ea2782..6374278 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
+++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
@@ -37,8 +37,8 @@ service CatalogProtocolService {
   rpc getIndexByName(StringProto) returns (IndexDescProto);
   rpc getIndex(GetIndexRequest) returns (IndexDescProto);
   rpc delIndex(StringProto) returns (BoolProto);
-  rpc registerFunction(FunctionDescProto) returns (BoolProto);
-  rpc unregisterFunction(UnregisterFunctionRequest) returns (BoolProto);
+  rpc createFunction(FunctionDescProto) returns (BoolProto);
+  rpc dropFunction(UnregisterFunctionRequest) returns (BoolProto);
   rpc getFunctionMeta(GetFunctionMetaRequest) returns (FunctionDescProto);
   rpc containFunction(ContainFunctionRequest) returns (BoolProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index fb55bf7..166a015 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -35,13 +35,11 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import static org.apache.tajo.common.TajoDataTypes.Type;
 
 public class CatalogUtil {
-  public static String getCanonicalName(String signature,
-      Collection<DataType> paramTypes) {
+  public static String getCanonicalName(String signature, Collection<DataType> paramTypes) {
     DataType [] types = paramTypes.toArray(new DataType[paramTypes.size()]);
     return getCanonicalName(signature, types);
   }
-  public static String getCanonicalName(String signature,
-      DataType...paramTypes) {
+  public static String getCanonicalName(String signature, DataType...paramTypes) {
     StringBuilder sb = new StringBuilder(signature);
     sb.append("(");
     int i = 0;
@@ -50,7 +48,6 @@ public class CatalogUtil {
       if(i < paramTypes.length - 1) {
         sb.append(",");
       }
-      
       i++;
     }
     sb.append(")");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index e36fbf3..05ef6df 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -169,8 +169,13 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
 	
 	public boolean contains(String colName) {
 		return fieldsByQialifiedName.containsKey(colName.toLowerCase());
+
 	}
 
+  public boolean containsAll(Collection<Column> columns) {
+    return fields.containsAll(columns);
+  }
+
   public synchronized Schema addColumn(String name, Type type) {
     if (type == Type.CHAR) {
       return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 6ef7613..470c12c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -107,8 +107,12 @@ message TableDescProto {
 }
 
 enum FunctionType {
-	GENERAL = 0;
-	AGGREGATION = 1;
+  GENERAL = 0;
+  AGGREGATION = 1;
+  DISTINCT_AGGREGATION = 2;
+	UDF = 3;
+	UDA = 4;
+	DISTINCT_UDA = 5;
 }
 
 message FunctionDescProto {
@@ -151,17 +155,18 @@ message GetFunctionsResponse {
 
 message UnregisterFunctionRequest {
 	required string signature = 1;
-	repeated DataType parameterTypes = 2;
 }
 
 message GetFunctionMetaRequest {
 	required string signature = 1;
-	repeated DataType parameterTypes = 2;
+	optional FunctionType functionType = 2;
+	repeated DataType parameterTypes = 3;
 }
 
 message ContainFunctionRequest {
 	required string signature = 1;
-	repeated DataType parameterTypes = 2;
+	optional FunctionType functionType = 2;
+	repeated DataType parameterTypes = 3;
 }
 
 message TableStatProto {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 94eae97..6edc03a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
 import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.store.CatalogStore;
 import org.apache.tajo.catalog.store.DBStore;
@@ -39,21 +38,27 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
+
 /**
  * This class provides the catalog service. The catalog service enables clients
  * to register, unregister and access information about tables, functions, and
  * cluster information.
  */
 public class CatalogServer extends AbstractService {
-
   private final static Log LOG = LogFactory.getLog(CatalogServer.class);
   private TajoConf conf;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -61,8 +66,8 @@ public class CatalogServer extends AbstractService {
   private final Lock wlock = lock.writeLock();
 
   private CatalogStore store;
-
-  private Map<String, FunctionDescProto> functions = new HashMap<String, FunctionDescProto>();
+  private Map<String, List<FunctionDescProto>> functions = new ConcurrentHashMap<String,
+      List<FunctionDescProto>>();
 
   // RPC variables
   private ProtoBlockingRpcServer rpcServer;
@@ -120,7 +125,7 @@ public class CatalogServer extends AbstractService {
   private void initBuiltinFunctions(List<FunctionDesc> functions)
       throws ServiceException {
     for (FunctionDesc desc : functions) {
-      handler.registerFunction(null, desc.getProto());
+      handler.createFunction(null, desc.getProto());
     }
   }
 
@@ -161,8 +166,7 @@ public class CatalogServer extends AbstractService {
     return this.bindAddress;
   }
 
-  public class CatalogProtocolHandler
-      implements CatalogProtocolService.BlockingInterface {
+  public class CatalogProtocolHandler implements CatalogProtocolService.BlockingInterface {
 
     @Override
     public TableDescProto getTableDesc(RpcController controller,
@@ -210,10 +214,10 @@ public class CatalogServer extends AbstractService {
     public GetFunctionsResponse getFunctions(RpcController controller,
                                              NullProto request)
         throws ServiceException {
-      Iterator<FunctionDescProto> iterator = functions.values().iterator();
+      Iterator<List<FunctionDescProto>> iterator = functions.values().iterator();
       GetFunctionsResponse.Builder builder = GetFunctionsResponse.newBuilder();
       while (iterator.hasNext()) {
-        builder.addFunctionDesc(iterator.next());
+        builder.addAllFunctionDesc(iterator.next());
       }
       return builder.build();
     }
@@ -397,81 +401,194 @@ public class CatalogServer extends AbstractService {
       return BOOL_TRUE;
     }
 
+    public boolean checkIfBuiltin(FunctionType type) {
+      return type == GENERAL || type == AGGREGATION || type == DISTINCT_AGGREGATION;
+    }
+
+    private boolean containFunction(String signature) {
+      List<FunctionDescProto> found = findFunction(signature);
+      return found != null && found.size() > 0;
+    }
+
+    private boolean containFunction(String signature, FunctionType type, List<DataType> params) {
+      return findFunction(signature, type, params) != null;
+    }
+
+    private List<FunctionDescProto> findFunction(String signature) {
+      return functions.get(signature);
+    }
+
+    private FunctionDescProto findFunction(String signature, List<DataType> params) {
+      if (functions.containsKey(signature)) {
+        for (FunctionDescProto existing : functions.get(signature)) {
+          if (existing.getParameterTypesList().containsAll(params) &&
+              params.containsAll(existing.getParameterTypesList())) {
+            return existing;
+          }
+        }
+      }
+      return null;
+    }
+
+    private FunctionDescProto findFunction(String signature, FunctionType type, List<DataType> params) {
+      if (functions.containsKey(signature)) {
+        for (FunctionDescProto existing : functions.get(signature)) {
+          if (existing.getType() == type &&
+              existing.getParameterTypesList().containsAll(params)
+              && params.containsAll(existing.getParameterTypesList())) {
+            return existing;
+          }
+        }
+      }
+      return null;
+    }
+
+    private FunctionDescProto findFunction(FunctionDescProto target) {
+      return findFunction(target.getSignature(), target.getType(), target.getParameterTypesList());
+    }
+
     @Override
-    public BoolProto registerFunction(RpcController controller,
-                                      FunctionDescProto funcDesc)
+    public BoolProto createFunction(RpcController controller, FunctionDescProto funcDesc)
         throws ServiceException {
-      String canonicalName =
-          CatalogUtil.getCanonicalName(funcDesc.getSignature(),
-              funcDesc.getParameterTypesList());
-      if (functions.containsKey(canonicalName)) {
-        throw new AlreadyExistsFunctionException(canonicalName);
+      FunctionSignature signature = FunctionSignature.create(funcDesc);
+
+      if (functions.containsKey(funcDesc.getSignature())) {
+        FunctionDescProto found = findFunction(funcDesc);
+        if (found != null) {
+          throw new AlreadyExistsFunctionException(signature.toString());
+        }
       }
 
-      functions.put(canonicalName, funcDesc);
+      TUtil.putToNestedList(functions, funcDesc.getSignature(), funcDesc);
       if (LOG.isDebugEnabled()) {
-        LOG.info("Function " + canonicalName + " is registered.");
+        LOG.info("Function " + signature + " is registered.");
       }
 
       return BOOL_TRUE;
     }
 
     @Override
-    public BoolProto unregisterFunction(RpcController controller,
-                                        UnregisterFunctionRequest request)
+    public BoolProto dropFunction(RpcController controller, UnregisterFunctionRequest request)
         throws ServiceException {
-      String signature = request.getSignature();
-      List<DataType> paramTypes = new ArrayList<DataType>();
-      int size = request.getParameterTypesCount();
-      for (int i = 0; i < size; i++) {
-        paramTypes.add(request.getParameterTypes(i));
-      }
-      String canonicalName = CatalogUtil.getCanonicalName(signature, paramTypes);
-      if (!functions.containsKey(canonicalName)) {
-        throw new NoSuchFunctionException(canonicalName);
+
+      if (!containFunction(request.getSignature())) {
+        throw new NoSuchFunctionException(request.getSignature());
       }
 
-      functions.remove(canonicalName);
-      LOG.info("GeneralFunction " + canonicalName + " is unregistered.");
+      functions.remove(request.getSignature());
+      LOG.info(request.getSignature() + " is dropped.");
 
       return BOOL_TRUE;
     }
 
     @Override
-    public FunctionDescProto getFunctionMeta(RpcController controller,
-                                             GetFunctionMetaRequest request)
+    public FunctionDescProto getFunctionMeta(RpcController controller, GetFunctionMetaRequest request)
         throws ServiceException {
-      List<DataType> paramTypes = new ArrayList<DataType>();
-      int size = request.getParameterTypesCount();
-      for (int i = 0; i < size; i++) {
-        paramTypes.add(request.getParameterTypes(i));
-      }
-
-      String key = CatalogUtil.getCanonicalName(
-          request.getSignature().toLowerCase(), paramTypes);
-      if (!functions.containsKey(key)) {
+      if (request.hasFunctionType()) {
+        if (containFunction(request.getSignature(), request.getFunctionType(), request.getParameterTypesList())) {
+          FunctionDescProto desc = findFunction(request.getSignature(), request.getFunctionType(),
+              request.getParameterTypesList());
+          return desc;
+        }
         return null;
       } else {
-        return functions.get(key);
+        FunctionDescProto function = findFunction(request.getSignature(), request.getParameterTypesList());
+        return function;
       }
     }
 
     @Override
-    public BoolProto containFunction(RpcController controller,
-                                     ContainFunctionRequest request)
+    public BoolProto containFunction(RpcController controller, ContainFunctionRequest request)
         throws ServiceException {
-      List<DataType> paramTypes = new ArrayList<DataType>();
-      int size = request.getParameterTypesCount();
-      for (int i = 0; i < size; i++) {
-        paramTypes.add(request.getParameterTypes(i));
+      boolean returnValue;
+      if (request.hasFunctionType()) {
+        returnValue = containFunction(request.getSignature(), request.getFunctionType(),
+            request.getParameterTypesList());
+      } else {
+        returnValue = containFunction(request.getSignature());
       }
-      boolean returnValue =
-          functions.containsKey(CatalogUtil.getCanonicalName(
-              request.getSignature().toLowerCase(), paramTypes));
+
       return BoolProto.newBuilder().setValue(returnValue).build();
     }
   }
 
+  private static class FunctionSignature implements Comparable<FunctionSignature> {
+    private String signature;
+    private FunctionType type;
+    private DataType [] arguments;
+
+    public FunctionSignature(String signature, FunctionType type, List<DataType> arguments) {
+      this.signature = signature;
+      this.type = type;
+      this.arguments = arguments.toArray(new DataType[arguments.size()]);
+    }
+
+    public static FunctionSignature create(FunctionDescProto proto) {
+      return new FunctionSignature(proto.getSignature(), proto.getType(), proto.getParameterTypesList());
+    }
+
+    public static FunctionSignature create (GetFunctionMetaRequest proto) {
+      return new FunctionSignature(proto.getSignature(), proto.getFunctionType(), proto.getParameterTypesList());
+    }
+
+    public static FunctionSignature create(ContainFunctionRequest proto) {
+      return new FunctionSignature(proto.getSignature(), proto.getFunctionType(), proto.getParameterTypesList());
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append(signature);
+      sb.append("#").append(type.name());
+      sb.append("(");
+      int i = 0;
+      for (DataType type : arguments) {
+        sb.append(type.getType());
+        sb.append("[").append(type.getLength()).append("]");
+        if(i < arguments.length - 1) {
+          sb.append(",");
+        }
+        i++;
+      }
+      sb.append(")");
+
+      return sb.toString();
+    }
+
+    @Override
+    public int hashCode() {
+      return com.google.common.base.Objects.hashCode(signature, type, arguments);
+    }
+
+    @Override
+    public int compareTo(FunctionSignature o) {
+      int signatureCmp = signature.compareTo(o.signature);
+      if (signatureCmp != 0) {
+        return signatureCmp;
+      }
+
+      int typeCmp = type.compareTo(o.type);
+      if (typeCmp != 0) {
+        return typeCmp;
+      }
+
+      int min = Math.min(arguments.length, o.arguments.length);
+      int argCmp = 0;
+      for (int i = 0; i < min; i++) {
+        if (arguments.length < min && o.arguments.length < min) {
+          argCmp = arguments[i].getType().compareTo(o.arguments[i].getType());
+
+          if (argCmp != 0) {
+            return argCmp;
+          }
+        } else {
+          argCmp = arguments.length - o.arguments.length;
+        }
+      }
+      return argCmp;
+    }
+  }
+
   public static void main(String[] args) throws Exception {
     TajoConf conf = new TajoConf();
     CatalogServer catalog = new CatalogServer(new ArrayList<FunctionDesc>());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 2bf944d..bc3bf4b 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -179,40 +179,36 @@ public class TestCatalog {
 
 	@Test
 	public final void testRegisterFunc() throws Exception { 
-		assertFalse(catalog.containFunction("test2"));
-		FunctionDesc meta = new FunctionDesc("test2", TestFunc1.class, FunctionType.GENERAL,
+		assertFalse(catalog.containFunction("test2", FunctionType.UDF));
+		FunctionDesc meta = new FunctionDesc("test2", TestFunc1.class, FunctionType.UDF,
         CatalogUtil.newDataTypesWithoutLen(Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(Type.INT4));
 
-    catalog.registerFunction(meta);
+    catalog.createFunction(meta);
 		assertTrue(catalog.containFunction("test2", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
 		FunctionDesc retrived = catalog.getFunction("test2", CatalogUtil.newDataTypesWithoutLen(Type.INT4));
 
 		assertEquals(retrived.getSignature(),"test2");
 		assertEquals(retrived.getFuncClass(),TestFunc1.class);
-		assertEquals(retrived.getFuncType(),FunctionType.GENERAL);
+		assertEquals(retrived.getFuncType(),FunctionType.UDF);
 	}
 
   @Test
-  public final void testUnregisterFunc() throws Exception {    
-    assertFalse(catalog
-        .containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-    FunctionDesc meta = new FunctionDesc("test3", TestFunc1.class, FunctionType.GENERAL,
+  public final void testDropFunction() throws Exception {
+    assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+    FunctionDesc meta = new FunctionDesc("test3", TestFunc1.class, FunctionType.UDF,
         CatalogUtil.newDataTypesWithoutLen(Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(Type.INT4 ));
-    catalog.registerFunction(meta);
+    catalog.createFunction(meta);
     assertTrue(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
-    catalog.unregisterFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4));
-    assertFalse(catalog
-        .containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
+    catalog.dropFunction("test3");
+    assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4)));
 
-    assertFalse(catalog.containFunction("test3",
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
+    assertFalse(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
     FunctionDesc overload = new FunctionDesc("test3", TestFunc2.class, FunctionType.GENERAL,
         CatalogUtil.newDataTypesWithoutLen(Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB));
-    catalog.registerFunction(overload);
-    assertTrue(catalog.containFunction("test3",
-        CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
+    catalog.createFunction(overload);
+    assertTrue(catalog.containFunction("test3", CatalogUtil.newDataTypesWithoutLen(Type.INT4, Type.BLOB)));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java b/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java
new file mode 100644
index 0000000..d6a757d
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/annotation/Nullable.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.annotation;
+
+@java.lang.annotation.Documented
+@java.lang.annotation.Retention(java.lang.annotation.RetentionPolicy.RUNTIME)
+@java.lang.annotation.Target({java.lang.annotation.ElementType.PARAMETER, java.lang.annotation.ElementType.FIELD})
+public @interface Nullable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 3931709..76e9608 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -165,4 +165,20 @@ public class TUtil {
       map.put(k1, TUtil.newLinkedHashMap(k2, value));
     }
   }
+
+  public static String arrayToString(Object [] objects) {
+    boolean first = false;
+    StringBuilder sb = new StringBuilder();
+    for(Object object : objects) {
+      if (first) {
+        first = false;
+      } else {
+        sb.append(", ");
+      }
+
+      sb.append(object.toString());
+    }
+
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
index f6c54cb..afadaad 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FuncEval.java
@@ -27,6 +27,9 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.TUtil;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_AGGREGATION;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.DISTINCT_UDA;
+
 public abstract class FuncEval extends EvalNode implements Cloneable {
 	@Expose protected FunctionDesc funcDesc;
 	@Expose protected EvalNode [] argEvals;
@@ -37,6 +40,10 @@ public abstract class FuncEval extends EvalNode implements Cloneable {
 		this.argEvals = argEvals;
 	}
 
+  public boolean isDistinct() {
+    return funcDesc.getFuncType() == DISTINCT_AGGREGATION || funcDesc.getFuncType() == DISTINCT_UDA;
+  }
+
   @Override
   public EvalContext newContext() {
     FuncCallCtx newCtx = new FuncCallCtx(argEvals);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
index 2b5fb80..608c612 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValue.java
@@ -34,6 +34,7 @@ public final class CountValue extends CountRows {
         new Column("col", Type.ANY)
     });
   }
+
   @Override
   public void eval(FunctionContext ctx, Tuple params) {
     if (!(params.get(0) instanceof NullDatum)) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java
new file mode 100644
index 0000000..05850be
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/CountValueDistinct.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.function.FunctionContext;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.Int8Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Count(distinct column) function
+ */
+public final class CountValueDistinct extends CountRows {
+
+  public CountValueDistinct() {
+    super(new Column[] {
+        new Column("col", Type.ANY)
+    });
+  }
+
+  @Override
+  public void eval(FunctionContext context, Tuple params) {
+  }
+
+  @Override
+  public void merge(FunctionContext context, Tuple part) {
+    CountDistinctValueContext distinctContext = (CountDistinctValueContext) context;
+    Datum value = part.get(0);
+    if ((distinctContext.latest == null || (!distinctContext.latest.equals(value)) && !(value instanceof NullDatum))) {
+      distinctContext.latest = value;
+      distinctContext.count++;
+    }
+  }
+
+  @Override
+  public Datum getPartialResult(FunctionContext ctx) {
+    return DatumFactory.createInt8(((CountDistinctValueContext) ctx).count);
+  }
+
+  @Override
+  public Int8Datum terminate(FunctionContext ctx) {
+    return DatumFactory.createInt8(((CountDistinctValueContext) ctx).count);
+  }
+
+  @Override
+  public FunctionContext newContext() {
+    return new CountDistinctValueContext();
+  }
+
+  private class CountDistinctValueContext implements FunctionContext {
+    long count = 0;
+    Datum latest = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 7e7b45e..3ab98f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -602,7 +602,7 @@ public class LogicalPlan {
       return targetListManager.size();
     }
 
-    public void fillTarget(int idx) throws VerifyException {
+    public void fillTarget(int idx) throws PlanningException {
       targetListManager.update(idx, planner.createTarget(LogicalPlan.this, this, getProjection().getTargets()[idx]));
     }
 
@@ -623,7 +623,7 @@ public class LogicalPlan {
       return targetListManager.getUpdatedSchema();
     }
 
-    public void fillTargets() {
+    public void fillTargets() throws PlanningException {
       for (int i = 0; i < getTargetListNum(); i++) {
         if (!isAlreadyTargetCreated(i)) {
           try {
@@ -713,6 +713,16 @@ public class LogicalPlan {
           // Set the current targets to the GroupByNode because the GroupByNode is the last projection operator.
           GroupbyNode groupbyNode = (GroupbyNode) node;
           groupbyNode.setTargets(getCurrentTargets());
+          boolean distinct = false;
+          for (Target target : groupbyNode.getTargets()) {
+            for (AggFuncCallEval aggrFunc : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
+              if (aggrFunc.isDistinct()) {
+                distinct = true;
+                break;
+              }
+            }
+          }
+          groupbyNode.setDistinct(distinct);
           node.setOutSchema(updateSchema());
 
           // if a having condition is given,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 3edb712..f08855a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -46,6 +46,7 @@ import java.util.List;
 import java.util.Stack;
 
 import static org.apache.tajo.algebra.Aggregation.GroupType;
+import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
 
 /**
@@ -916,7 +917,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    ===============================================================================================*/
 
   public EvalNode createEvalTree(LogicalPlan plan, QueryBlock block, final Expr expr)
-      throws VerifyException {
+      throws PlanningException {
 
     switch(expr.getType()) {
       // constants
@@ -1012,8 +1013,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       case Column:
         return createFieldEval(plan, block, (ColumnReferenceExpr) expr);
 
-      case CountRowsFunction:
-        FunctionDesc countRows = catalog.getFunction("count", new DataType[] {});
+      case CountRowsFunction: {
+        FunctionDesc countRows = catalog.getFunction("count", FunctionType.AGGREGATION, new DataType[] {});
 
         try {
           block.setHasGrouping();
@@ -1024,13 +1025,15 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
           throw new UndefinedFunctionException(CatalogUtil.
               getCanonicalName(countRows.getSignature(), new DataType[]{}));
         }
-
+      }
       case GeneralSetFunction: {
         GeneralSetFunctionExpr setFunction = (GeneralSetFunctionExpr) expr;
         Expr[] params = setFunction.getParams();
         EvalNode[] givenArgs = new EvalNode[params.length];
         DataType[] paramTypes = new DataType[params.length];
 
+        FunctionType functionType = setFunction.isDistinct() ?
+            FunctionType.DISTINCT_AGGREGATION : FunctionType.AGGREGATION;
         givenArgs[0] = createEvalTree(plan, block, params[0]);
         if (setFunction.getSignature().equalsIgnoreCase("count")) {
           paramTypes[0] = CatalogUtil.newDataTypeWithoutLen(TajoDataTypes.Type.ANY);
@@ -1038,12 +1041,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
           paramTypes[0] = givenArgs[0].getValueType()[0];
         }
 
-        if (!catalog.containFunction(setFunction.getSignature(), paramTypes)) {
-          throw new UndefinedFunctionException(CatalogUtil.
-              getCanonicalName(setFunction.getSignature(), paramTypes));
+        if (!catalog.containFunction(setFunction.getSignature(), functionType, paramTypes)) {
+          throw new UndefinedFunctionException(CatalogUtil. getCanonicalName(setFunction.getSignature(), paramTypes));
         }
 
-        FunctionDesc funcDesc = catalog.getFunction(setFunction.getSignature(), paramTypes);
+        FunctionDesc funcDesc = catalog.getFunction(setFunction.getSignature(), functionType, paramTypes);
         if (!block.hasGroupbyNode()) {
           block.setHasGrouping();
         }
@@ -1068,23 +1070,23 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         }
 
         if (!catalog.containFunction(function.getSignature(), paramTypes)) {
-            throw new UndefinedFunctionException(CatalogUtil.
-                getCanonicalName(function.getSignature(), paramTypes));
+            throw new UndefinedFunctionException(CatalogUtil.getCanonicalName(function.getSignature(), paramTypes));
         }
 
         FunctionDesc funcDesc = catalog.getFunction(function.getSignature(), paramTypes);
 
         try {
-          if (funcDesc.getFuncType() == CatalogProtos.FunctionType.GENERAL)
 
-            return new FuncCallEval(funcDesc,
-                (GeneralFunction) funcDesc.newInstance(), givenArgs);
-          else {
+          FunctionType functionType = funcDesc.getFuncType();
+          if (functionType == FunctionType.GENERAL || functionType == FunctionType.UDF) {
+            return new FuncCallEval(funcDesc, (GeneralFunction) funcDesc.newInstance(), givenArgs);
+          } else if (functionType == FunctionType.AGGREGATION || functionType == FunctionType.UDA) {
             if (!block.hasGroupbyNode()) {
               block.setHasGrouping();
             }
-            return new AggFuncCallEval(funcDesc,
-                (AggFunction) funcDesc.newInstance(), givenArgs);
+            return new AggFuncCallEval(funcDesc, (AggFunction) funcDesc.newInstance(), givenArgs);
+          } else if (functionType == FunctionType.DISTINCT_AGGREGATION || functionType == FunctionType.DISTINCT_UDA) {
+            throw new PlanningException("Unsupported function: " + funcDesc.toString());
           }
         } catch (InternalException e) {
           e.printStackTrace();
@@ -1134,7 +1136,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   public CaseWhenEval createCaseWhenEval(LogicalPlan plan, QueryBlock block,
-                                              CaseWhenPredicate caseWhen) throws VerifyException {
+                                              CaseWhenPredicate caseWhen) throws PlanningException {
     CaseWhenEval caseEval = new CaseWhenEval();
     EvalNode condition;
     EvalNode result;
@@ -1153,7 +1155,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   Target[] annotateTargets(LogicalPlan plan, QueryBlock block, org.apache.tajo.algebra.Target [] targets)
-      throws VerifyException {
+      throws PlanningException {
     Target annotatedTargets [] = new Target[targets.length];
 
     for (int i = 0; i < targets.length; i++) {
@@ -1163,7 +1165,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   Target createTarget(LogicalPlan plan, QueryBlock block,
-                             org.apache.tajo.algebra.Target target) throws VerifyException {
+                             org.apache.tajo.algebra.Target target) throws PlanningException {
     if (target.hasAlias()) {
       return new Target(createEvalTree(plan, block, target.getExpr()),
           target.getAlias());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index c1a1b2f..8110a3b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -22,6 +22,7 @@
 package org.apache.tajo.engine.planner;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ObjectArrays;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +40,7 @@ import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.util.IndexUtil;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.List;
@@ -375,7 +377,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) {
         return createInMemoryHashAggregation(context, groupbyNode, subOp);
       } else {
-        return createSortAggregation(context, groupbyNode, subOp);
+        return createSortAggregation(context, property, groupbyNode, subOp);
       }
     }
     return createBestAggregationPlan(context, groupbyNode, subOp);
@@ -387,19 +389,33 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     return new HashAggregateExec(ctx, groupbyNode, subOp);
   }
 
-  private PhysicalExec createSortAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
+  private PhysicalExec createSortAggregation(TaskAttemptContext ctx, EnforceProperty property, GroupbyNode groupbyNode, PhysicalExec subOp)
       throws IOException {
+
     Column[] grpColumns = groupbyNode.getGroupingColumns();
-    SortSpec[] specs = new SortSpec[grpColumns.length];
+    SortSpec[] sortSpecs = new SortSpec[grpColumns.length];
     for (int i = 0; i < grpColumns.length; i++) {
-      specs[i] = new SortSpec(grpColumns[i], true, false);
+      sortSpecs[i] = new SortSpec(grpColumns[i], true, false);
+    }
+
+    if (property != null) {
+      List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
+      SortSpec[] enforcedSortSpecs = new SortSpec[sortSpecProtos.size()];
+      int i = 0;
+
+      for (int j = 0; j < sortSpecProtos.size(); i++, j++) {
+        enforcedSortSpecs[i] = new SortSpec(sortSpecProtos.get(j));
+      }
+
+      sortSpecs = ObjectArrays.concat(sortSpecs, enforcedSortSpecs, SortSpec.class);
     }
-    SortNode sortNode = new SortNode(-1, specs);
+
+    SortNode sortNode = new SortNode(-1, sortSpecs);
     sortNode.setInSchema(subOp.getSchema());
     sortNode.setOutSchema(subOp.getSchema());
     // SortExec sortExec = new SortExec(sortNode, child);
     ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
-    LOG.info("The planner chooses [Sort Aggregation]");
+    LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
     return new SortAggregateExec(ctx, groupbyNode, sortExec);
   }
 
@@ -420,7 +436,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       LOG.info("The planner chooses [Hash Aggregation]");
       return createInMemoryHashAggregation(context, groupbyNode, subOp);
     } else {
-      return createSortAggregation(context, groupbyNode, subOp);
+      return createSortAggregation(context, null, groupbyNode, subOp);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 60243bc..8f44166 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,10 +35,7 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.query.exception.InvalidQueryException;
 import org.apache.tajo.storage.TupleComparator;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 public class PlannerUtil {
   private static final Log LOG = LogFactory.getLog(PlannerUtil.class);
@@ -111,42 +109,64 @@ public class PlannerUtil {
     }
     parentNode.setChild(newNode);
   }
-  
+
   public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
     Preconditions.checkNotNull(groupBy);
 
     GroupbyNode child = null;
+
+    // cloning groupby node
     try {
-      // cloning groupby node
       child = (GroupbyNode) groupBy.clone();
+    } catch (CloneNotSupportedException e) {
+      e.printStackTrace();
+    }
 
-      List<Target> newChildTargets = Lists.newArrayList();
-      Target[] secondTargets = groupBy.getTargets();
-      Target[] firstTargets = child.getTargets();
+    List<Target> firstStepTargets = Lists.newArrayList();
+    Target[] secondTargets = groupBy.getTargets();
+    Target[] firstTargets = child.getTargets();
 
-      Target second;
-      Target first;
-      int targetId =  0;
-      for (int i = 0; i < firstTargets.length; i++) {
-        second = secondTargets[i];
-        first = firstTargets[i];
+    Target second;
+    Target first;
+    int targetId =  0;
+    for (int i = 0; i < firstTargets.length; i++) {
+      second = secondTargets[i];
+      first = firstTargets[i];
 
-        List<AggFuncCallEval> secondFuncs = EvalTreeUtil
-            .findDistinctAggFunction(second.getEvalTree());
-        List<AggFuncCallEval> firstFuncs = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
+      List<AggFuncCallEval> secondStepFunctions = EvalTreeUtil.findDistinctAggFunction(second.getEvalTree());
+      List<AggFuncCallEval> firstStepFunctions = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
 
-        if (firstFuncs.size() == 0) {
-          newChildTargets.add(first);
-          targetId++;
-        } else {
-          for (AggFuncCallEval func : firstFuncs) {
+      if (firstStepFunctions.size() == 0) {
+        firstStepTargets.add(first);
+        targetId++;
+      } else {
+        for (AggFuncCallEval func : firstStepFunctions) {
+          Target newTarget;
+
+          if (func.isDistinct()) {
+            List<Column> fields = EvalTreeUtil.findAllColumnRefs(func);
+            newTarget = new Target(new FieldEval(fields.get(0)));
+            String targetName = "column_" + (targetId++);
+            newTarget.setAlias(targetName);
+
+            AggFuncCallEval secondFunc = null;
+            for (AggFuncCallEval sf : secondStepFunctions) {
+              if (func.equals(sf)) {
+                secondFunc = sf;
+                break;
+              }
+            }
+
+            secondFunc.setArgs(new EvalNode [] {new FieldEval(
+                new Column(targetName, newTarget.getEvalTree().getValueType()[0]))});
+          } else {
             func.setFirstPhase();
-            Target newTarget = new Target(func);
+            newTarget = new Target(func);
             String targetName = "column_" + (targetId++);
             newTarget.setAlias(targetName);
 
             AggFuncCallEval secondFunc = null;
-            for (AggFuncCallEval sf : secondFuncs) {
+            for (AggFuncCallEval sf : secondStepFunctions) {
               if (func.equals(sf)) {
                 secondFunc = sf;
                 break;
@@ -158,23 +178,32 @@ public class PlannerUtil {
               secondFunc.setArgs(new EvalNode [] {new FieldEval(
                   new Column(targetName, newTarget.getEvalTree().getValueType()[0]))});
             }
-            newChildTargets.add(newTarget);
           }
+          firstStepTargets.add(newTarget);
+        }
+      }
+
+      // Getting new target list and updating input/output schema from the new target list.
+      Target[] targetArray = firstStepTargets.toArray(new Target[firstStepTargets.size()]);
+      Schema targetSchema = PlannerUtil.targetToSchema(targetArray);
+      List<Target> newTarget = Lists.newArrayList();
+      for (Column column : groupBy.getGroupingColumns()) {
+        if (!targetSchema.contains(column.getQualifiedName())) {
+          newTarget.add(new Target(new FieldEval(column)));
         }
       }
+      targetArray = ObjectArrays.concat(targetArray, newTarget.toArray(new Target[newTarget.size()]), Target.class);
 
-      Target[] targetArray = newChildTargets.toArray(new Target[newChildTargets.size()]);
       child.setTargets(targetArray);
       child.setOutSchema(PlannerUtil.targetToSchema(targetArray));
       // set the groupby chaining
       groupBy.setChild(child);
       groupBy.setInSchema(child.getOutSchema());
-    } catch (CloneNotSupportedException e) {
-      LOG.error(e);
+
     }
-    
     return child;
   }
+
   
   /**
    * Find the top logical node matched to type from the given node
@@ -402,10 +431,24 @@ public class PlannerUtil {
   }
 
   public static SortSpec[] schemaToSortSpecs(Schema schema) {
-    SortSpec[] specs = new SortSpec[schema.getColumnNum()];
+    return schemaToSortSpecs(schema.toArray());
+  }
 
-    for (int i = 0; i < schema.getColumnNum(); i++) {
-      specs[i] = new SortSpec(schema.getColumn(i), true, false);
+  public static SortSpec[] schemaToSortSpecs(Column [] columns) {
+    SortSpec[] specs = new SortSpec[columns.length];
+
+    for (int i = 0; i < columns.length; i++) {
+      specs[i] = new SortSpec(columns[i], true, false);
+    }
+
+    return specs;
+  }
+
+  public static SortSpec [] columnsToSortSpec(Collection<Column> columns) {
+    SortSpec[] specs = new SortSpec[columns.size()];
+    int i = 0;
+    for (Column column : columns) {
+      specs[i++] = new SortSpec(column, true, false);
     }
 
     return specs;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index a268a39..d7c3ba4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.enforce;
 
 
+import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.util.TUtil;
@@ -29,6 +30,7 @@ import java.util.Map;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
 
 public class Enforcer implements ProtoObject<EnforcerProto> {
   Map<EnforceType, List<EnforceProperty>> properties;
@@ -88,7 +90,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
     TUtil.putToNestedList(properties, builder.getType(), builder.build());
   }
 
-  public void addJoin(int pid, JoinEnforce.JoinAlgorithm algorithm) {
+  public void enforceJoinAlgorithm(int pid, JoinEnforce.JoinAlgorithm algorithm) {
     EnforceProperty.Builder builder = newProperty();
     JoinEnforce.Builder enforce = JoinEnforce.newBuilder();
     enforce.setPid(pid);
@@ -99,18 +101,34 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
     TUtil.putToNestedList(properties, builder.getType(), builder.build());
   }
 
-  public void addGroupby(int pid, GroupbyEnforce.GroupbyAlgorithm algorithm) {
+  public void enforceSortAggregation(int pid, @Nullable SortSpec[] sortSpecs) {
     EnforceProperty.Builder builder = newProperty();
     GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
     enforce.setPid(pid);
-    enforce.setAlgorithm(algorithm);
+    enforce.setAlgorithm(GroupbyAlgorithm.SORT_AGGREGATION);
+    if (sortSpecs != null) {
+      for (SortSpec sortSpec : sortSpecs) {
+        enforce.addSortSpecs(sortSpec.getProto());
+      }
+    }
+
+    builder.setType(EnforceType.GROUP_BY);
+    builder.setGroupby(enforce.build());
+    TUtil.putToNestedList(properties, builder.getType(), builder.build());
+  }
+
+  public void enforceHashAggregation(int pid) {
+    EnforceProperty.Builder builder = newProperty();
+    GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
+    enforce.setPid(pid);
+    enforce.setAlgorithm(GroupbyAlgorithm.HASH_AGGREGATION);
 
     builder.setType(EnforceType.GROUP_BY);
     builder.setGroupby(enforce.build());
     TUtil.putToNestedList(properties, builder.getType(), builder.build());
   }
 
-  public void addSort(int pid, SortEnforce.SortAlgorithm algorithm) {
+  public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) {
     EnforceProperty.Builder builder = newProperty();
     SortEnforce.Builder enforce = SortEnforce.newBuilder();
     enforce.setPid(pid);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 4f9b6d7..2e10353 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -31,6 +31,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   @Expose private Schema havingSchema;
 	@Expose private EvalNode havingCondition = null;
 	@Expose private Target [] targets;
+  @Expose private boolean hasDistinct = false;
 	
 	public GroupbyNode(int pid, final Column [] columns) {
 		super(pid, NodeType.GROUP_BY);
@@ -45,10 +46,18 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   public final boolean isEmptyGrouping() {
     return columns == null || columns.length == 0;
   }
-	
+
 	public final Column [] getGroupingColumns() {
 	  return this.columns;
 	}
+
+  public final boolean isDistinct() {
+    return hasDistinct;
+  }
+
+  public void setDistinct(boolean distinct) {
+    hasDistinct = distinct;
+  }
 	
 	public final boolean hasHavingCondition() {
 	  return this.havingCondition != null;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 0570a88..6ab72cb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -189,6 +189,10 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
       currentRequired.addAll(EvalTreeUtil.findDistinctRefColumns(target.getEvalTree()));
     }
 
+    for (Column column : node.getGroupingColumns()) {
+      currentRequired.add(column);
+    }
+
     PushDownContext groupByContext = new PushDownContext(context);
     groupByContext.upperRequired = currentRequired;
     return pushDownCommonPost(groupByContext, node, stack);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
index 37c77b7..7ce929c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.master;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -26,6 +27,8 @@ import org.apache.tajo.DataChannel;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AggFuncCallEval;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
@@ -126,7 +129,7 @@ public class GlobalPlanner {
 
     return currentBlock;
   }
-  
+
   public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
     Preconditions.checkArgument(channel.getSchema() != null,
         "Channel schema (" + channel.getSrcId().getId() +" -> "+ channel.getTargetId().getId()+") is not initialized");
@@ -194,69 +197,33 @@ public class GlobalPlanner {
     ExecutionBlock currentBlock = null;
     GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
 
-    GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
-    firstGroupBy.setHavingCondition(null);
-
-    if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
-        ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
-
-      UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
-      ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
-      UnionsFinderContext finderContext = new UnionsFinderContext();
-      finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
-
+    if (groupByNode.isDistinct()) {
+      if (childBlock == null) { // first repartition node
+        childBlock = masterPlan.newExecutionBlock();
+      }
+      childBlock.setPlan(groupByNode.getChild());
       currentBlock = masterPlan.newExecutionBlock();
-      GroupbyNode secondGroupBy = groupByNode;
-      for (UnionNode union : finderContext.unionList) {
-        TableSubQueryNode leftSubQuery = union.getLeftChild();
-        TableSubQueryNode rightSubQuery = union.getRightChild();
-        DataChannel dataChannel;
-        if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
-          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-          GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-          g1.setChild(leftSubQuery);
-          execBlock.setPlan(g1);
-          dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
 
-          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-          secondGroupBy.setChild(scanNode);
-          masterPlan.addConnect(dataChannel);
-        }
-        if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
-          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-          GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-          g1.setChild(rightSubQuery);
-          execBlock.setPlan(g1);
-          dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+      LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
 
-          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-          secondGroupBy.setChild(scanNode);
-          masterPlan.addConnect(dataChannel);
+      for (Target target : groupByNode.getTargets()) {
+        List<AggFuncCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
+        for (AggFuncCallEval function : functions) {
+          if (function.isDistinct()) {
+            columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+          }
         }
       }
-      LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-      if (parent instanceof UnaryNode && parent != secondGroupBy) {
-        ((UnaryNode)parent).setChild(secondGroupBy);
-      }
-      currentBlock.setPlan(currentNode);
-    } else {
 
-      if (childBlock == null) { // first repartition node
-        childBlock = masterPlan.newExecutionBlock();
-      }
-      childBlock.setPlan(firstGroupBy);
-
-      currentBlock = masterPlan.newExecutionBlock();
+      Set<Column> existingColumns = Sets.newHashSet(groupByNode.getGroupingColumns());
+      columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
+      SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
+      currentBlock.getEnforcer().enforceSortAggregation(groupByNode.getPID(), sortSpecs);
 
       DataChannel channel;
-      if (firstGroupBy.isEmptyGrouping()) {
-        channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
-        channel.setPartitionKey(firstGroupBy.getGroupingColumns());
-      } else {
-        channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-        channel.setPartitionKey(firstGroupBy.getGroupingColumns());
-      }
-      channel.setSchema(firstGroupBy.getOutSchema());
+      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+      channel.setPartitionKey(groupByNode.getGroupingColumns());
+      channel.setSchema(groupByNode.getInSchema());
 
       GroupbyNode secondGroupBy = groupByNode;
       ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
@@ -269,6 +236,85 @@ public class GlobalPlanner {
 
       masterPlan.addConnect(channel);
       currentBlock.setPlan(currentNode);
+      
+    } else {
+
+      GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
+      firstGroupBy.setHavingCondition(null);
+
+      if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+          ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+        UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
+        ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
+        UnionsFinderContext finderContext = new UnionsFinderContext();
+        finder.visitChild(masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>(), finderContext);
+
+        currentBlock = masterPlan.newExecutionBlock();
+        GroupbyNode secondGroupBy = groupByNode;
+        for (UnionNode union : finderContext.unionList) {
+          TableSubQueryNode leftSubQuery = union.getLeftChild();
+          TableSubQueryNode rightSubQuery = union.getRightChild();
+          DataChannel dataChannel;
+          if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
+            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+            g1.setChild(leftSubQuery);
+            execBlock.setPlan(g1);
+            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+            secondGroupBy.setChild(scanNode);
+            masterPlan.addConnect(dataChannel);
+          }
+          if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
+            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
+            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
+            g1.setChild(rightSubQuery);
+            execBlock.setPlan(g1);
+            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
+
+            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+            secondGroupBy.setChild(scanNode);
+            masterPlan.addConnect(dataChannel);
+          }
+        }
+        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+        if (parent instanceof UnaryNode && parent != secondGroupBy) {
+          ((UnaryNode)parent).setChild(secondGroupBy);
+        }
+        currentBlock.setPlan(currentNode);
+      } else {
+
+        if (childBlock == null) { // first repartition node
+          childBlock = masterPlan.newExecutionBlock();
+        }
+        childBlock.setPlan(firstGroupBy);
+
+        currentBlock = masterPlan.newExecutionBlock();
+
+        DataChannel channel;
+        if (firstGroupBy.isEmptyGrouping()) {
+          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
+          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+        } else {
+          channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+        }
+        channel.setSchema(firstGroupBy.getOutSchema());
+
+        GroupbyNode secondGroupBy = groupByNode;
+        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+        secondGroupBy.setChild(scanNode);
+
+        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
+        if (parent instanceof UnaryNode && parent != secondGroupBy) {
+          ((UnaryNode)parent).setChild(secondGroupBy);
+        }
+
+        masterPlan.addConnect(channel);
+        currentBlock.setPlan(currentNode);
+      }
     }
 
     return new ExecutionBlock [] {currentBlock, childBlock};

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index 1296ea4..ccb5ddb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -278,6 +277,9 @@ public class TajoMaster extends CompositeService {
     sqlFuncs.add(new FunctionDesc("count", CountRows.class, FunctionType.AGGREGATION,
         CatalogUtil.newDataTypesWithoutLen(Type.INT8),
         CatalogUtil.newDataTypesWithoutLen()));
+    sqlFuncs.add(new FunctionDesc("count", CountValueDistinct.class, FunctionType.DISTINCT_AGGREGATION,
+        CatalogUtil.newDataTypesWithoutLen(Type.INT8),
+        CatalogUtil.newDataTypesWithoutLen(Type.ANY)));
 
     // GeoIP
     sqlFuncs.add(new FunctionDesc("in_country", InCountry.class, FunctionType.GENERAL,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 7911925..7a956e5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -469,7 +469,7 @@ public class Repartitioner {
       }
     }
 
-    GroupbyNode groupby = (GroupbyNode) childSubQuery.getBlock().getPlan();
+    GroupbyNode groupby = PlannerUtil.findTopNode(subQuery.getBlock().getPlan(), NodeType.GROUP_BY);
     // the number of tasks cannot exceed the number of merged fetch uris.
     int determinedTaskNum = Math.min(maxNum, finalFetchURI.size());
     if (groupby.getGroupingColumns().length == 0) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index ba5f342..4d75e46 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -220,6 +220,7 @@ message GroupbyEnforce {
 
   required int32 pid = 1;
   required GroupbyAlgorithm algorithm = 2;
+  repeated SortSpecProto sortSpecs = 3;
 }
 
 message SortEnforce {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index b3be308..7d14e4c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -57,7 +57,7 @@ public class TestEvalTree {
     util.startCatalogCluster();
     cat = util.getMiniCatalogCluster().getCatalog();
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      cat.registerFunction(funcDesc);
+      cat.createFunction(funcDesc);
     }
 
     Schema schema = new Schema();
@@ -72,7 +72,7 @@ public class TestEvalTree {
     FunctionDesc funcMeta = new FunctionDesc("test_sum", TestSum.class, FunctionType.GENERAL,
         CatalogUtil.newDataTypesWithoutLen(INT4),
         CatalogUtil.newDataTypesWithoutLen(INT4, INT4));
-    cat.registerFunction(funcMeta);
+    cat.createFunction(funcMeta);
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(cat);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 40c4875..079e653 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -64,7 +64,7 @@ public class TestEvalTreeUtil {
     util.startCatalogCluster();
     catalog = util.getMiniCatalogCluster().getCatalog();
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      catalog.registerFunction(funcDesc);
+      catalog.createFunction(funcDesc);
     }
 
     Schema schema = new Schema();
@@ -80,7 +80,7 @@ public class TestEvalTreeUtil {
         FunctionType.GENERAL,
         CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(TajoDataTypes.Type.INT4, TajoDataTypes.Type.INT4));
-    catalog.registerFunction(funcMeta);
+    catalog.createFunction(funcMeta);
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
index 2ba6ee6..c3e5d00 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/ExprTestBase.java
@@ -56,7 +56,7 @@ public class ExprTestBase {
     util.startCatalogCluster();
     cat = util.getMiniCatalogCluster().getCatalog();
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      cat.registerFunction(funcDesc);
+      cat.createFunction(funcDesc);
     }
 
     analyzer = new SQLAnalyzer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 72cbfc2..9ef6ab0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -49,7 +49,7 @@ public class TestLogicalOptimizer {
     util.startCatalogCluster();
     catalog = util.getMiniCatalogCluster().getCatalog();
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      catalog.registerFunction(funcDesc);
+      catalog.createFunction(funcDesc);
     }
     
     Schema schema = new Schema();
@@ -85,7 +85,7 @@ public class TestLogicalOptimizer {
         CatalogUtil.newDataTypesWithoutLen(Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(Type.INT4));
 
-    catalog.registerFunction(funcDesc);
+    catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
     optimizer = new LogicalOptimizer();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 9526c66..11775c1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -52,7 +52,7 @@ public class TestLogicalPlan {
     util.startCatalogCluster();
     catalog = util.getMiniCatalogCluster().getCatalog();
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      catalog.registerFunction(funcDesc);
+      catalog.createFunction(funcDesc);
     }
 
     // TPC-H Schema for Complex Queries

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index b708213..06bc52f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -59,7 +59,7 @@ public class TestLogicalPlanner {
     util.startCatalogCluster();
     catalog = util.getMiniCatalogCluster().getCatalog();
     for (FunctionDesc funcDesc : TajoMaster.initBuiltinFunctions()) {
-      catalog.registerFunction(funcDesc);
+      catalog.createFunction(funcDesc);
     }
 
     Schema schema = new Schema();
@@ -108,7 +108,7 @@ public class TestLogicalPlanner {
       catalog.addTable(d);
     }
 
-    catalog.registerFunction(funcDesc);
+    catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 46fd648..130c2f7 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -85,7 +85,7 @@ public class TestPlannerUtil {
         CatalogUtil.newDataTypesWithoutLen(Type.INT4),
         CatalogUtil.newDataTypesWithoutLen(Type.INT4));
 
-    catalog.registerFunction(funcDesc);
+    catalog.createFunction(funcDesc);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/733192f3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 425418f..b311c9d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -35,7 +35,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -139,7 +138,7 @@ public class TestBNLJoinExec {
     LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
-    enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
 
     Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(),
         Integer.MAX_VALUE);
@@ -180,7 +179,7 @@ public class TestBNLJoinExec {
 
     JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN);
     Enforcer enforcer = new Enforcer();
-    enforcer.addJoin(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
+    enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin");
     TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(),


Mime
View raw message