tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [4/4] tajo git commit: TAJO-269: Protocol buffer De/Serialization for LogicalNode.
Date Tue, 30 Dec 2014 12:55:23 GMT
TAJO-269: Protocol buffer De/Serialization for LogicalNode.

Closes #322


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/32be38d4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/32be38d4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/32be38d4

Branch: refs/heads/master
Commit: 32be38d41affc498b01286938f3fea89a8def1a9
Parents: 6fde9e5
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Tue Dec 30 21:52:53 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Tue Dec 30 21:53:29 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../java/org/apache/tajo/catalog/Schema.java    |  33 +-
 .../java/org/apache/tajo/catalog/TableDesc.java |   2 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |  20 +
 .../java/org/apache/tajo/util/ProtoUtil.java    |  19 +
 .../org/apache/tajo/util/ReflectionUtil.java    |  61 +-
 .../main/java/org/apache/tajo/util/TUtil.java   |  41 ++
 .../engine/codegen/ExecutorPreCompiler.java     |   8 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  17 +-
 .../tajo/engine/planner/enforce/Enforcer.java   |  12 +-
 .../engine/planner/global/GlobalPlanner.java    |  36 +-
 .../global/builder/DistinctGroupbyBuilder.java  |  32 +-
 .../BaseGlobalPlanRewriteRuleProvider.java      |  39 +
 .../rewriter/GlobalPlanRewriteEngine.java       |  84 +++
 .../global/rewriter/GlobalPlanRewriteRule.java  |  49 ++
 .../rewriter/GlobalPlanRewriteRuleProvider.java |  33 +
 .../rewriter/GlobalPlanTestRuleProvider.java    |  44 ++
 .../rules/GlobalPlanEqualityTester.java         |  63 ++
 .../DistinctGroupbyFirstAggregationExec.java    |   2 +-
 .../DistinctGroupbyHashAggregationExec.java     |   4 +-
 .../DistinctGroupbySecondAggregationExec.java   |   2 +-
 .../DistinctGroupbySortAggregationExec.java     |   4 +-
 .../DistinctGroupbyThirdAggregationExec.java    |   2 +-
 .../apache/tajo/engine/query/TaskRequest.java   |   3 +-
 .../tajo/engine/query/TaskRequestImpl.java      |  28 +-
 .../utils/test/ErrorInjectionRewriter.java      |  10 +-
 .../tajo/master/DefaultTaskScheduler.java       |  12 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   1 +
 .../apache/tajo/master/exec/QueryExecutor.java  |   6 +-
 .../master/querymaster/QueryMasterTask.java     |   9 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   4 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |  14 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  15 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |  14 +-
 .../tajo/engine/query/TestGroupByQuery.java     |  59 +-
 .../tajo/engine/query/TestSelectQuery.java      |  27 +-
 .../tajo/engine/query/TestTruncateTable.java    |   8 +-
 .../tajo/engine/query/TestWindowQuery.java      |   6 +-
 .../apache/tajo/master/TestGlobalPlanner.java   |   6 +-
 .../org/apache/tajo/plan/LogicalOptimizer.java  |  52 +-
 .../tajo/plan/LogicalPlanPreprocessor.java      |   4 +-
 .../org/apache/tajo/plan/LogicalPlanner.java    |  13 +-
 .../main/java/org/apache/tajo/plan/Target.java  |  13 +-
 .../plan/expr/AggregationFunctionCallEval.java  |  38 +-
 .../org/apache/tajo/plan/expr/EvalNode.java     |  10 +-
 .../tajo/plan/expr/WindowFunctionEval.java      |  12 +
 .../tajo/plan/logical/AlterTableNode.java       |  10 +
 .../tajo/plan/logical/AlterTablespaceNode.java  |  13 +-
 .../apache/tajo/plan/logical/BinaryNode.java    |  16 +
 .../tajo/plan/logical/CreateDatabaseNode.java   |  10 +
 .../tajo/plan/logical/CreateTableNode.java      |  11 +-
 .../tajo/plan/logical/DistinctGroupbyNode.java  |  39 +-
 .../tajo/plan/logical/DropDatabaseNode.java     |  17 +-
 .../apache/tajo/plan/logical/DropTableNode.java |  10 +
 .../apache/tajo/plan/logical/EvalExprNode.java  |  12 +-
 .../apache/tajo/plan/logical/GroupbyNode.java   |  47 +-
 .../apache/tajo/plan/logical/InsertNode.java    |  15 +-
 .../apache/tajo/plan/logical/LogicalNode.java   |  16 +-
 .../org/apache/tajo/plan/logical/NodeType.java  |   6 +-
 .../tajo/plan/logical/ProjectionNode.java       |  12 +-
 .../apache/tajo/plan/logical/RelationNode.java  |   2 +-
 .../org/apache/tajo/plan/logical/ScanNode.java  |  13 +-
 .../tajo/plan/logical/SetSessionNode.java       |  22 +-
 .../tajo/plan/logical/StoreTableNode.java       |  11 +-
 .../tajo/plan/logical/TableSubQueryNode.java    |  12 +-
 .../tajo/plan/logical/TruncateTableNode.java    |  10 +
 .../org/apache/tajo/plan/logical/UnaryNode.java |  14 +
 .../apache/tajo/plan/logical/WindowSpec.java    |  57 +-
 .../tajo/plan/nameresolver/NameResolver.java    |   6 +-
 .../plan/nameresolver/ResolverByLegacy.java     |   2 +-
 .../rewrite/BaseLogicalPlanRewriteEngine.java   |  89 +++
 .../BaseLogicalPlanRewriteRuleProvider.java     |  59 ++
 .../plan/rewrite/BasicQueryRewriteEngine.java   |  72 --
 .../plan/rewrite/LogicalPlanRewriteEngine.java  |  33 +
 .../plan/rewrite/LogicalPlanRewriteRule.java    |  57 ++
 .../rewrite/LogicalPlanRewriteRuleProvider.java |  44 ++
 .../rewrite/LogicalPlanTestRuleProvider.java    |  44 ++
 .../tajo/plan/rewrite/QueryRewriteEngine.java   |  32 -
 .../apache/tajo/plan/rewrite/RewriteRule.java   |  56 --
 .../plan/rewrite/rules/FilterPushDownRule.java  |   9 +-
 .../rules/LogicalPlanEqualityTester.java        |  55 ++
 .../rewrite/rules/PartitionedTableRewriter.java |  44 +-
 .../rewrite/rules/ProjectionPushDownRule.java   |  11 +-
 .../tajo/plan/serder/EvalNodeDeserializer.java  | 301 ++++++++
 .../tajo/plan/serder/EvalNodeSerializer.java    | 397 ++++++++++
 .../plan/serder/EvalTreeProtoDeserializer.java  | 218 ------
 .../plan/serder/EvalTreeProtoSerializer.java    | 310 --------
 .../plan/serder/LogicalNodeDeserializer.java    | 678 +++++++++++++++++
 .../tajo/plan/serder/LogicalNodeSerializer.java | 724 +++++++++++++++++++
 .../apache/tajo/plan/serder/package-info.java   |  23 +
 .../org/apache/tajo/plan/util/PlannerUtil.java  |  16 +-
 .../plan/visitor/BasicLogicalPlanVisitor.java   |  39 +-
 .../plan/visitor/ExplainLogicalPlanVisitor.java |   5 +-
 .../tajo/plan/visitor/LogicalPlanVisitor.java   |   7 +-
 tajo-plan/src/main/proto/Plan.proto             | 363 ++++++++--
 .../org/apache/tajo/storage/StorageManager.java |   4 +-
 .../storage/hbase/AddSortForInsertRewriter.java |  10 +-
 .../tajo/storage/hbase/HBaseStorageManager.java |   6 +-
 98 files changed, 3960 insertions(+), 1102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 013bb25..25d6e55 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-269: Protocol buffer De/Serialization for LogicalNode. (hyunsik)
+
     TAJO-1266: Too many logs when writing a parquet relation. 
     (DaeMyung Kang via jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 672b8e3..71c1b01 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -199,24 +199,21 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
   }
 	
 	public int getColumnId(String name) {
-    String [] parts = name.split("\\.");
-    if (parts.length == 2 || parts.length == 3) {
-      if (fieldsByQualifiedName.containsKey(name)) {
-        return fieldsByQualifiedName.get(name);
-      } else {
-        return -1;
-      }
-    } else {
-      List<Integer> list = fieldsByName.get(name);
-      if (list == null) {
-        return -1;
-      } else  if (list.size() == 1) {
-        return fieldsByName.get(name).get(0);
-      } else if (list.size() == 0) {
-        return -1;
-      } else { // if list.size > 2
-        throw throwAmbiguousFieldException(list);
-      }
+    // if the same column exists, immediately return that column.
+    if (fieldsByQualifiedName.containsKey(name)) {
+      return fieldsByQualifiedName.get(name);
+    }
+
+    // The following is some workaround code.
+    List<Integer> list = fieldsByName.get(name);
+    if (list == null) {
+      return -1;
+    } else  if (list.size() == 1) {
+      return fieldsByName.get(name).get(0);
+    } else if (list.size() == 0) {
+      return -1;
+    } else { // if list.size > 2
+      throw throwAmbiguousFieldException(list);
     }
 	}
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index ce167e1..ec679f9 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -168,7 +168,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
       boolean eq = tableName.equals(other.tableName);
       eq = eq && schema.equals(other.schema);
       eq = eq && meta.equals(other.meta);
-      eq = eq && uri.equals(other.uri);
+      eq = eq && TUtil.checkEquals(uri, other.uri);
       eq = eq && TUtil.checkEquals(partitionMethodDesc, other.partitionMethodDesc);
       eq = eq && TUtil.checkEquals(external, other.external);
       return eq && TUtil.checkEquals(stats, other.stats);

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index d0c6460..ab11ddd 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -249,6 +249,12 @@ public class TajoConf extends Configuration {
     TASK_DEFAULT_SIZE("tajo.task.size-mb", 128),
 
     // Query and Optimization -------------------------------------------------
+    // This class provides a ordered list of logical plan rewrite rule classes.
+    LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS("tajo.plan.logical.rewriter.provider",
+        "org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteRuleProvider"),
+    // This class provides a ordered list of global plan rewrite rule classes.
+    GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS("tajo.plan.global.rewriter.provider",
+        "org.apache.tajo.engine.planner.global.rewriter.BaseGlobalPlanRewriteRuleProvider"),
     EXECUTOR_EXTERNAL_SORT_THREAD_NUM("tajo.executor.external-sort.thread-num", 1),
     EXECUTOR_EXTERNAL_SORT_FANOUT("tajo.executor.external-sort.fanout-num", 8),
 
@@ -561,6 +567,20 @@ public class TajoConf extends Configuration {
     setBoolVar(this, var, val);
   }
 
+  public void setClassVar(ConfVars var, Class<?> clazz) {
+    setVar(var, clazz.getCanonicalName());
+  }
+
+  public Class<?> getClassVar(ConfVars var) {
+    String valueString = getVar(var);
+
+    try {
+      return getClassByName(valueString);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   public static String getVar(Configuration conf, ConfVars var) {
     return conf.get(var.varname, var.defaultVal);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java
index dbc987d..f9d759b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ProtoUtil.java
@@ -18,7 +18,11 @@
 
 package org.apache.tajo.util;
 
+import com.google.common.collect.Lists;
+import org.apache.tajo.common.ProtoObject;
+
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.*;
@@ -52,4 +56,19 @@ public class ProtoUtil {
   public static KeyValueSetProto convertFromMap(Map<String, String> map) {
     return new KeyValueSet(map).getProto();
   }
+
+  /**
+   * It converts an array of ProtoObjects into Iteratable one.
+   *
+   * @param protoObjects
+   * @param <T>
+   * @return
+   */
+  public static <T> Iterable<T> toProtoObjects(ProtoObject[] protoObjects) {
+    List<T> converted = Lists.newArrayList();
+    for (int i = 0; i < protoObjects.length; i++) {
+      converted.add((T) protoObjects[i].getProto());
+    }
+    return converted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
index eccc61f..e2def69 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
@@ -18,22 +18,71 @@
 
 package org.apache.tajo.util;
 
+import org.apache.tajo.conf.TajoConf;
+
 import java.lang.reflect.Constructor;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class ReflectionUtil {
-  private static final Class<?>[] EMPTY_ARRAY = new Class[]{};
+  private static final Class<?>[] EMPTY_PARAM = new Class[]{};
+  private static final Object [] EMPTY_OBJECT = new Object[] {};
+  private static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class};
 
   /**
-   * Cache of constructors for each class. Pins the classes so they
+   * Caches of constructors for each class. Pins the classes so they
    * can't be garbage collected until ReflectionUtils can be collected.
+   *
+   * EMPTY_CONSTRUCTOR_CACHE keeps classes which don't have any parameterized constructor, and
+   * CONF_CONSTRUCTOR_CACHE keeps classes which have one constructor to take TajoConf.
    */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+  private static final Map<Class<?>, Constructor<?>> EMPTY_CONSTRUCTOR_CACHE =
+      new ConcurrentHashMap<Class<?>, Constructor<?>>();
+  private static final Map<Class<?>, Constructor<?>> CONF_CONSTRUCTOR_CACHE =
       new ConcurrentHashMap<Class<?>, Constructor<?>>();
 
-	public static Object newInstance(Class<?> clazz)
-			throws InstantiationException, IllegalAccessException {
-		return clazz.newInstance();
+  /**
+   * Initialize an instance by a given class
+   *
+   * @param clazz Class to be initialized
+   * @return initialized object
+   */
+	public static <T> T newInstance(Class<? extends T> clazz) {
+    try {
+      Constructor<?> constructor;
+      if (EMPTY_CONSTRUCTOR_CACHE.containsKey(clazz)) {
+        constructor = EMPTY_CONSTRUCTOR_CACHE.get(clazz);
+      } else {
+        constructor = clazz.getConstructor(EMPTY_PARAM);
+        EMPTY_CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+
+      return (T) constructor.newInstance(EMPTY_OBJECT);
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
 	}
+
+  /**
+   * Initialize an instance by a given class with TajoConf parameter
+   *
+   * @param clazz Class to be initialized
+   * @param conf TajoConf instance
+   * @return initialized object
+   */
+  public static <T> T newInstance(Class<? extends T> clazz, TajoConf conf) {
+    try {
+      Constructor<?> constructor;
+      if (CONF_CONSTRUCTOR_CACHE.containsKey(clazz)) {
+        constructor = CONF_CONSTRUCTOR_CACHE.get(clazz);
+      } else {
+        constructor = clazz.getConstructor(CONF_PARAM);
+        CONF_CONSTRUCTOR_CACHE.put(clazz, constructor);
+      }
+
+      return (T) constructor.newInstance(new Object[]{conf});
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 0ceb2b2..a1de860 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -42,6 +42,37 @@ public class TUtil {
   }
 
   /**
+   * check two collections as equals. It also check the equivalence of null.
+   * It will return true even if they are all null.
+   *
+   * @param s1 the first collection to be compared.
+   * @param s2 the second collection to be compared
+   * @return true if they are equal or all null
+   */
+  public static boolean checkEquals(Collection<?> s1, Collection<?> s2) {
+    if (s1 == null ^ s2 == null) {
+      return false;
+    } else if (s1 == null && s2 == null) {
+      return true;
+    } else {
+      if (s1.size() == 0 && s2.size() == 0) {
+        return true;
+      } else if (s1.size() == s2.size()) {
+        Iterator<?> it1 = s1.iterator();
+        Iterator<?> it2 = s2.iterator();
+        Object o1;
+        Object o2;
+        for (o1 = it1.next(), o2 = it2.next(); it1.hasNext() && it2.hasNext(); o1 = it1.next(), o2 = it2.next()) {
+          if (!o1.equals(o2)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
    * check two arrays as equals. It also check the equivalence of null.
    * It will return true even if they are all null.
    *
@@ -59,6 +90,16 @@ public class TUtil {
     }
   }
 
+  public static boolean checkEquals(int [] s1, int [] s2) {
+    if (s1 == null ^ s2 == null) {
+      return false;
+    } else if (s1 == null && s2 == null) {
+      return true;
+    } else {
+      return Arrays.equals(s1, s2);
+    }
+  }
+
   public static <T> T[] concat(T[] first, T[] second) {
     T[] result = Arrays.copyOf(first, first.length + second.length);
     System.arraycopy(second, 0, result, first.length, second.length);

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
index d588e7f..79513dc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/codegen/ExecutorPreCompiler.java
@@ -149,9 +149,9 @@ public class ExecutorPreCompiler extends BasicLogicalPlanVisitor<ExecutorPreComp
     return node;
   }
 
-  public LogicalNode visitDistinct(CompilationContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                   DistinctGroupbyNode node, Stack<LogicalNode> stack) throws PlanningException {
-    super.visitDistinct(context, plan, block, node, stack);
+  public LogicalNode visitDistinctGroupby(CompilationContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                          DistinctGroupbyNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitDistinctGroupby(context, plan, block, node, stack);
 
     compileProjectableNode(context, node.getInSchema(), node);
     return node;
@@ -190,7 +190,7 @@ public class ExecutorPreCompiler extends BasicLogicalPlanVisitor<ExecutorPreComp
 
     if (node.hasTargets()) {
       for (Target target : node.getTargets()) {
-        compileIfAbsent(context, node.getTableSchema(), target.getEvalTree());
+        compileIfAbsent(context, node.getLogicalSchema(), target.getEvalTree());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 2a34637..d043a27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -34,6 +34,7 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.physical.*;
@@ -877,7 +878,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
 
       boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName());
-      SortSpec [] sortSpecs = PlannerUtil.convertSortSpecs(sortEnforcer.getSortSpecsList());
+      SortSpec [] sortSpecs = LogicalNodeDeserializer.convertSortSpecs(sortEnforcer.getSortSpecsList());
       return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs);
     } else {
       return false;
@@ -1089,7 +1090,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
     if (phase == 3) {
       sortSpecs.add(new SortSpec(distinctNode.getTargets()[0].getNamedColumn()));
     }
-    for (GroupbyNode eachGroupbyNode: distinctNode.getGroupByNodes()) {
+    for (GroupbyNode eachGroupbyNode: distinctNode.getSubPlans()) {
       for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
         sortSpecs.add(new SortSpec(eachColumn));
       }
@@ -1110,7 +1111,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   private PhysicalExec createSortAggregationDistinctGroupbyExec(TaskAttemptContext ctx,
       DistinctGroupbyNode distinctGroupbyNode, PhysicalExec subOp,
       DistinctGroupbyEnforcer enforcer) throws IOException {
-    List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getGroupByNodes();
+    List<GroupbyNode> groupbyNodes = distinctGroupbyNode.getSubPlans();
 
     SortAggregateExec[] sortAggregateExec = new SortAggregateExec[groupbyNodes.size()];
 
@@ -1216,15 +1217,15 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       List<EnforceProperty> properties = enforcer.getEnforceProperties(type);
       EnforceProperty found = null;
       for (EnforceProperty property : properties) {
-        if (type == EnforceType.JOIN && property.getJoin().getPid() == node.getPID()) {
+        if (type == EnforceType.JOIN && property.getJoin().getNodeId() == node.getPID()) {
           found = property;
-        } else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) {
+        } else if (type == EnforceType.GROUP_BY && property.getGroupby().getNodeId() == node.getPID()) {
           found = property;
-        } else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getPid() == node.getPID()) {
+        } else if (type == EnforceType.DISTINCT_GROUP_BY && property.getDistinct().getNodeId() == node.getPID()) {
           found = property;
-        } else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) {
+        } else if (type == EnforceType.SORT && property.getSort().getNodeId() == node.getPID()) {
           found = property;
-        } else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getPid() == node.getPID()) {
+        } else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getNodeId() == node.getPID()) {
           found = property;
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
index e2d7744..8128390 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/enforce/Enforcer.java
@@ -98,7 +98,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
   public void enforceJoinAlgorithm(int pid, JoinEnforce.JoinAlgorithm algorithm) {
     EnforceProperty.Builder builder = newProperty();
     JoinEnforce.Builder enforce = JoinEnforce.newBuilder();
-    enforce.setPid(pid);
+    enforce.setNodeId(pid);
     enforce.setAlgorithm(algorithm);
 
     builder.setType(EnforceType.JOIN);
@@ -109,7 +109,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
   public void enforceSortAggregation(int pid, @Nullable SortSpec[] sortSpecs) {
     EnforceProperty.Builder builder = newProperty();
     GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
-    enforce.setPid(pid);
+    enforce.setNodeId(pid);
     enforce.setAlgorithm(GroupbyAlgorithm.SORT_AGGREGATION);
     if (sortSpecs != null) {
       for (SortSpec sortSpec : sortSpecs) {
@@ -125,7 +125,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
   public void enforceHashAggregation(int pid) {
     EnforceProperty.Builder builder = newProperty();
     GroupbyEnforce.Builder enforce = GroupbyEnforce.newBuilder();
-    enforce.setPid(pid);
+    enforce.setNodeId(pid);
     enforce.setAlgorithm(GroupbyAlgorithm.HASH_AGGREGATION);
 
     builder.setType(EnforceType.GROUP_BY);
@@ -146,7 +146,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
                                          List<SortSpecArray> sortSpecArrays) {
     EnforceProperty.Builder builder = newProperty();
     DistinctGroupbyEnforcer.Builder enforce = DistinctGroupbyEnforcer.newBuilder();
-    enforce.setPid(pid);
+    enforce.setNodeId(pid);
     enforce.setIsMultipleAggregation(isMultipleAggregation);
     enforce.setAlgorithm(algorithm);
     if (sortSpecArrays != null) {
@@ -164,7 +164,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
   public void enforceSortAlgorithm(int pid, SortEnforce.SortAlgorithm algorithm) {
     EnforceProperty.Builder builder = newProperty();
     SortEnforce.Builder enforce = SortEnforce.newBuilder();
-    enforce.setPid(pid);
+    enforce.setNodeId(pid);
     enforce.setAlgorithm(algorithm);
 
     builder.setType(EnforceType.SORT);
@@ -203,7 +203,7 @@ public class Enforcer implements ProtoObject<EnforcerProto> {
   public void enforceColumnPartitionAlgorithm(int pid, ColumnPartitionAlgorithm algorithm) {
     EnforceProperty.Builder builder = newProperty();
     ColumnPartitionEnforcer.Builder enforce = ColumnPartitionEnforcer.newBuilder();
-    enforce.setPid(pid);
+    enforce.setNodeId(pid);
     enforce.setAlgorithm(algorithm);
 
     builder.setType(EnforceType.COLUMN_PARTITION);

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index c75b348..6c3e3b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.global;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -36,17 +37,20 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.BroadcastJoinMarkCandidateVisitor;
 import org.apache.tajo.engine.planner.BroadcastJoinPlanVisitor;
 import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder;
+import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine;
+import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.function.AggFunction;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.ReflectionUtil;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TajoWorker;
 
@@ -54,6 +58,7 @@ import java.io.IOException;
 import java.util.*;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.conf.TajoConf.ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS;
 import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*;
 
 /**
@@ -64,34 +69,29 @@ public class GlobalPlanner {
 
   private final TajoConf conf;
   private final CatalogProtos.StoreType storeType;
-  private CatalogService catalog;
-  private TajoWorker.WorkerContext workerContext;
+  private final CatalogService catalog;
+  private final GlobalPlanRewriteEngine rewriteEngine;
 
+  @VisibleForTesting
   public GlobalPlanner(final TajoConf conf, final CatalogService catalog) throws IOException {
     this.conf = conf;
     this.catalog = catalog;
     this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
     Preconditions.checkArgument(storeType != null);
+
+    Class<? extends GlobalPlanRewriteRuleProvider> clazz =
+        (Class<? extends GlobalPlanRewriteRuleProvider>) conf.getClassVar(GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS);
+    GlobalPlanRewriteRuleProvider provider = ReflectionUtil.newInstance(clazz, conf);
+    rewriteEngine = new GlobalPlanRewriteEngine();
+    rewriteEngine.addRewriteRule(provider.getRules());
   }
 
   public GlobalPlanner(final TajoConf conf, final TajoWorker.WorkerContext workerContext) throws IOException {
-    this.conf = conf;
-    this.workerContext = workerContext;
-    this.storeType = CatalogProtos.StoreType.valueOf(conf.getVar(ConfVars.SHUFFLE_FILE_FORMAT).toUpperCase());
-    Preconditions.checkArgument(storeType != null);
+    this(conf, workerContext.getCatalog());
   }
 
-  /**
-   * TODO: this is hack. it must be refactored at TAJO-602.
-   */
   public CatalogService getCatalog() {
-    if (workerContext.getCatalog() != null) {
-      return workerContext.getCatalog();
-    } else if (catalog != null) {
-      return catalog;
-    } else {
-      throw new IllegalStateException("No Catalog Instance");
-    }
+    return catalog;
   }
 
   public CatalogProtos.StoreType getStoreType() {
@@ -163,6 +163,8 @@ public class GlobalPlanner {
 
     masterPlan.setTerminal(terminalBlock);
     LOG.info("\n" + masterPlan.toString());
+
+    masterPlan = rewriteEngine.rewrite(masterPlan);
   }
 
   private static void setFinalOutputChannel(DataChannel outputChannel, Schema outputSchema) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
index 671bb19..5c6e80e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/builder/DistinctGroupbyBuilder.java
@@ -99,7 +99,7 @@ public class DistinctGroupbyBuilder {
       DistinctGroupbyNode thirdStageDistinctNode = PlannerUtil.clone(plan, baseDistinctNode);
 
       // Set second, third non-distinct aggregation's eval node to field eval
-      GroupbyNode lastGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(secondStageDistinctNode.getGroupByNodes().size() - 1);
+      GroupbyNode lastGroupbyNode = secondStageDistinctNode.getSubPlans().get(secondStageDistinctNode.getSubPlans().size() - 1);
       if (!lastGroupbyNode.isDistinct()) {
         int index = 0;
         for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) {
@@ -108,7 +108,7 @@ public class DistinctGroupbyBuilder {
           index++;
         }
       }
-      lastGroupbyNode = thirdStageDistinctNode.getGroupByNodes().get(thirdStageDistinctNode.getGroupByNodes().size() - 1);
+      lastGroupbyNode = thirdStageDistinctNode.getSubPlans().get(thirdStageDistinctNode.getSubPlans().size() - 1);
       if (!lastGroupbyNode.isDistinct()) {
         int index = 0;
         for (AggregationFunctionCallEval aggrFunction: lastGroupbyNode.getAggFunctions()) {
@@ -300,11 +300,11 @@ public class DistinctGroupbyBuilder {
 
     DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID());
     baseDistinctNode.setTargets(baseGroupByTargets.toArray(new Target[]{}));
-    baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns());
+    baseDistinctNode.setGroupingColumns(groupbyNode.getGroupingColumns());
     baseDistinctNode.setInSchema(groupbyNode.getInSchema());
     baseDistinctNode.setChild(groupbyNode.getChild());
 
-    baseDistinctNode.setGroupbyNodes(childGroupbyNodes);
+    baseDistinctNode.setSubPlans(childGroupbyNodes);
 
     return baseDistinctNode;
   }
@@ -468,11 +468,11 @@ public class DistinctGroupbyBuilder {
 
     DistinctGroupbyNode baseDistinctNode = new DistinctGroupbyNode(context.getPlan().getLogicalPlan().newPID());
     baseDistinctNode.setTargets(groupbyNode.getTargets());
-    baseDistinctNode.setGroupColumns(groupbyNode.getGroupingColumns());
+    baseDistinctNode.setGroupingColumns(groupbyNode.getGroupingColumns());
     baseDistinctNode.setInSchema(groupbyNode.getInSchema());
     baseDistinctNode.setChild(groupbyNode.getChild());
 
-    baseDistinctNode.setGroupbyNodes(childGroupbyNodes);
+    baseDistinctNode.setSubPlans(childGroupbyNodes);
 
     return baseDistinctNode;
   }
@@ -529,12 +529,12 @@ public class DistinctGroupbyBuilder {
     // - Change SecondStage's aggregation expr and target column name. For example:
     //     exprs: (sum(default.lineitem.l_quantity (FLOAT8))) ==> exprs: (sum(?sum_3 (FLOAT8)))
     int grpIdx = 0;
-    for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) {
-      GroupbyNode secondStageGroupbyNode = secondStageDistinctNode.getGroupByNodes().get(grpIdx);
+    for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getSubPlans()) {
+      GroupbyNode secondStageGroupbyNode = secondStageDistinctNode.getSubPlans().get(grpIdx);
 
       if (firstStageGroupbyNode.isDistinct()) {
         // FirstStage: Remove aggregation, Set target with only grouping columns
-        firstStageGroupbyNode.setAggFunctions(null);
+        firstStageGroupbyNode.setAggFunctions(PlannerUtil.EMPTY_AGG_FUNCS);
 
         List<Target> firstGroupbyTargets = new ArrayList<Target>();
         for (Column column : firstStageGroupbyNode.getGroupingColumns()) {
@@ -614,7 +614,7 @@ public class DistinctGroupbyBuilder {
 
     // In the case of distinct query without group by clause
     // other aggregation function is added to last distinct group by node.
-    List<GroupbyNode> secondStageGroupbyNodes = secondStageDistinctNode.getGroupByNodes();
+    List<GroupbyNode> secondStageGroupbyNodes = secondStageDistinctNode.getSubPlans();
     GroupbyNode lastSecondStageGroupbyNode = secondStageGroupbyNodes.get(secondStageGroupbyNodes.size() - 1);
     if (!lastSecondStageGroupbyNode.isDistinct() && lastSecondStageGroupbyNode.isEmptyGrouping()) {
       GroupbyNode otherGroupbyNode = lastSecondStageGroupbyNode;
@@ -644,7 +644,7 @@ public class DistinctGroupbyBuilder {
     List<Integer> firstStageColumnIds = new ArrayList<Integer>();
     columnIdIndex = 0;
     List<Target> firstTargets = new ArrayList<Target>();
-    for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+    for (GroupbyNode firstStageGroupbyNode: firstStageDistinctNode.getSubPlans()) {
       if (firstStageGroupbyNode.isDistinct()) {
         for (Column column : firstStageGroupbyNode.getGroupingColumns()) {
           Target firstTarget = new Target(new FieldEval(column));
@@ -674,7 +674,7 @@ public class DistinctGroupbyBuilder {
     Schema secondStageInSchema = new Schema();
     //TODO merged tuple schema
     int index = 0;
-    for(GroupbyNode eachNode: secondStageDistinctNode.getGroupByNodes()) {
+    for(GroupbyNode eachNode: secondStageDistinctNode.getSubPlans()) {
       eachNode.setInSchema(firstStageDistinctNode.getOutSchema());
       for (Column column: eachNode.getOutSchema().getColumns()) {
         if (secondStageInSchema.getColumn(column) == null) {
@@ -695,13 +695,13 @@ public class DistinctGroupbyBuilder {
 
     List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>();
     int index = 0;
-    for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+    for (GroupbyNode groupbyNode: firstStageDistinctNode.getSubPlans()) {
       List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>();
       for (Column column: groupbyNode.getGroupingColumns()) {
         sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build());
       }
       sortSpecArrays.add( SortSpecArray.newBuilder()
-          .setPid(secondStageDistinctNode.getGroupByNodes().get(index).getPID())
+          .setNodeId(secondStageDistinctNode.getSubPlans().get(index).getPID())
           .addAllSortSpecs(sortSpecs).build());
     }
     secondStageBlock.getEnforcer().enforceDistinctAggregation(secondStageDistinctNode.getPID(),
@@ -723,13 +723,13 @@ public class DistinctGroupbyBuilder {
 
     List<SortSpecArray> sortSpecArrays = new ArrayList<SortSpecArray>();
     int index = 0;
-    for (GroupbyNode groupbyNode: firstStageDistinctNode.getGroupByNodes()) {
+    for (GroupbyNode groupbyNode: firstStageDistinctNode.getSubPlans()) {
       List<SortSpecProto> sortSpecs = new ArrayList<SortSpecProto>();
       for (Column column: groupbyNode.getGroupingColumns()) {
         sortSpecs.add(SortSpecProto.newBuilder().setColumn(column.getProto()).build());
       }
       sortSpecArrays.add( SortSpecArray.newBuilder()
-          .setPid(thirdStageDistinctNode.getGroupByNodes().get(index).getPID())
+          .setNodeId(thirdStageDistinctNode.getSubPlans().get(index).getPID())
           .addAllSortSpecs(sortSpecs).build());
     }
     thirdStageBlock.getEnforcer().enforceDistinctAggregation(thirdStageDistinctNode.getPID(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java
new file mode 100644
index 0000000..96ee2c6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/BaseGlobalPlanRewriteRuleProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collection;
+import java.util.List;
+
+@SuppressWarnings("unused")
+public class BaseGlobalPlanRewriteRuleProvider extends GlobalPlanRewriteRuleProvider {
+  private static final List<Class<? extends GlobalPlanRewriteRule>> EMPTY_RULES = TUtil.newList();
+
+  public BaseGlobalPlanRewriteRuleProvider(TajoConf conf) {
+    super(conf);
+  }
+
+  @Override
+  public Collection<Class<? extends GlobalPlanRewriteRule>> getRules() {
+    return EMPTY_RULES;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java
new file mode 100644
index 0000000..c01ed0e
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteEngine.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.util.ReflectionUtil;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class GlobalPlanRewriteEngine {
+  /** class logger */
+  private static final Log LOG = LogFactory.getLog(GlobalPlanRewriteEngine.class);
+
+  /** a map for query rewrite rules  */
+  private final Map<String, GlobalPlanRewriteRule> rewriteRules = new LinkedHashMap<String, GlobalPlanRewriteRule>();
+
+  /**
+   * Add a query rewrite rule to this engine.
+   *
+   * @param rules Rule classes
+   */
+  public void addRewriteRule(Iterable<Class<? extends GlobalPlanRewriteRule>> rules) {
+    for (Class<? extends GlobalPlanRewriteRule> clazz : rules) {
+      try {
+        GlobalPlanRewriteRule rule = ReflectionUtil.newInstance(clazz);
+        addRewriteRule(rule);
+      } catch (Throwable t) {
+        throw new RuntimeException(t);
+      }
+    }
+  }
+
+  /**
+   * Add a query rewrite rule to this engine.
+   *
+   * @param rule The rule to be added to this engine.
+   */
+  public void addRewriteRule(GlobalPlanRewriteRule rule) {
+    if (!rewriteRules.containsKey(rule.getName())) {
+      rewriteRules.put(rule.getName(), rule);
+    }
+  }
+
+  /**
+   * Rewrite a global plan with all query rewrite rules added to this engine.
+   *
+   * @param plan The plan to be rewritten with all query rewrite rule.
+   * @return The rewritten plan.
+   */
+  public MasterPlan rewrite(MasterPlan plan) throws PlanningException {
+    GlobalPlanRewriteRule rule;
+    for (Map.Entry<String, GlobalPlanRewriteRule> rewriteRule : rewriteRules.entrySet()) {
+      rule = rewriteRule.getValue();
+      if (rule.isEligible(plan)) {
+        plan = rule.rewrite(plan);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query.");
+        }
+      }
+    }
+
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java
new file mode 100644
index 0000000..4a37207
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRule.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter;
+
+import org.apache.tajo.engine.planner.global.MasterPlan;
+
+/**
+ * A rewrite rule for global plans
+ */
+public interface GlobalPlanRewriteRule {
+
+  /**
+   * Return rule name
+   * @return Rule name
+   */
+  public abstract String getName();
+
+  /**
+   * Check if this rule should be applied.
+   *
+   * @param plan Global Plan
+   * @return
+   */
+  public abstract boolean isEligible(MasterPlan plan);
+
+  /**
+   * Rewrite a global plan
+   *
+   * @param plan Global Plan
+   * @return
+   */
+  public abstract MasterPlan rewrite(MasterPlan plan);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java
new file mode 100644
index 0000000..638b5f3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanRewriteRuleProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter;
+
+import org.apache.tajo.conf.TajoConf;
+
+import java.util.Collection;
+
+public abstract class GlobalPlanRewriteRuleProvider {
+  protected final TajoConf conf;
+
+  public GlobalPlanRewriteRuleProvider(TajoConf conf) {
+    this.conf = conf;
+  }
+
+  public abstract Collection<Class<? extends GlobalPlanRewriteRule>> getRules();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java
new file mode 100644
index 0000000..dc91577
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/GlobalPlanTestRuleProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanEqualityTester;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * It is used only for test.
+ */
+@SuppressWarnings("unused")
+public class GlobalPlanTestRuleProvider extends BaseGlobalPlanRewriteRuleProvider {
+
+  public GlobalPlanTestRuleProvider(TajoConf conf) {
+    super(conf);
+  }
+
+  @Override
+  public Collection<Class<? extends GlobalPlanRewriteRule>> getRules() {
+    List<Class<? extends GlobalPlanRewriteRule>> injectedRules = Lists.newArrayList(super.getRules());
+    injectedRules.add(GlobalPlanEqualityTester.class);
+    return injectedRules;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
new file mode 100644
index 0000000..e2fd47f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/rewriter/rules/GlobalPlanEqualityTester.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.global.rewriter.rules;
+
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
+import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRule;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
+import org.apache.tajo.plan.serder.LogicalNodeSerializer;
+import org.apache.tajo.plan.serder.PlanProto;
+
+/**
+ * It verifies the equality between the input and output of LogicalNodeTree(De)Serializer in global planning.
+ */
+public class GlobalPlanEqualityTester implements GlobalPlanRewriteRule {
+
+  @Override
+  public String getName() {
+    return "GlobalPlanEqualityTester";
+  }
+
+  @Override
+  public boolean isEligible(MasterPlan plan) {
+    return true;
+  }
+
+  @Override
+  public MasterPlan rewrite(MasterPlan plan) {
+    try {
+      ExecutionBlockCursor cursor = new ExecutionBlockCursor(plan);
+      while (cursor.hasNext()) {
+        ExecutionBlock eb = cursor.nextBlock();
+        LogicalNode node = eb.getPlan();
+        if (node != null) {
+          PlanProto.LogicalNodeTree tree = LogicalNodeSerializer.serialize(node);
+          LogicalNode deserialize = LogicalNodeDeserializer.deserialize(plan.getContext(), tree);
+          assert node.deepEquals(deserialize);
+        }
+      }
+      return plan;
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
index bd24fa3..aca4879 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyFirstAggregationExec.java
@@ -130,7 +130,7 @@ public class DistinctGroupbyFirstAggregationExec extends PhysicalExec {
     }
     resultTupleLength = groupingKeyIndexes.length + 1;  //1 is Sequence Datum which indicates sequence of DistinctNode.
 
-    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+    List<GroupbyNode> groupbyNodes = plan.getSubPlans();
 
     List<DistinctHashAggregator> distinctAggrList = new ArrayList<DistinctHashAggregator>();
     int distinctSeq = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
index eac5c70..37d61a9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyHashAggregationExec.java
@@ -76,7 +76,7 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec {
       distinctGroupingKeyIds[idx++] = intVal.intValue();
     }
 
-    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+    List<GroupbyNode> groupbyNodes = plan.getSubPlans();
     groupbyNodeNum = groupbyNodes.size();
     this.hashAggregators = new HashAggregator[groupbyNodeNum];
 
@@ -88,7 +88,7 @@ public class DistinctGroupbyHashAggregationExec extends PhysicalExec {
     outputColumnNum = plan.getOutSchema().size();
 
     int allGroupbyOutColNum = 0;
-    for (GroupbyNode eachGroupby: plan.getGroupByNodes()) {
+    for (GroupbyNode eachGroupby: plan.getSubPlans()) {
       allGroupbyOutColNum += eachGroupby.getOutSchema().size();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
index 383ccd3..cce9a24 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySecondAggregationExec.java
@@ -100,7 +100,7 @@ public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec {
 
     numGroupingColumns = plan.getGroupingColumns().length;
 
-    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+    List<GroupbyNode> groupbyNodes = plan.getSubPlans();
 
     // Finding distinct group by column index.
     Set<Integer> groupingKeyIndexSet = new HashSet<Integer>();

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
index 06b241c..6641633 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbySortAggregationExec.java
@@ -47,13 +47,13 @@ public class DistinctGroupbySortAggregationExec extends PhysicalExec {
     super(context, plan.getInSchema(), plan.getOutSchema());
     this.plan = plan;
     this.aggregateExecs = aggregateExecs;
-    this.groupbyNodeNum = plan.getGroupByNodes().size();
+    this.groupbyNodeNum = plan.getSubPlans().size();
 
     currentTuples = new Tuple[groupbyNodeNum];
     outColumnNum = outSchema.size();
 
     int allGroupbyOutColNum = 0;
-    for (GroupbyNode eachGroupby: plan.getGroupByNodes()) {
+    for (GroupbyNode eachGroupby: plan.getSubPlans()) {
       allGroupbyOutColNum += eachGroupby.getOutSchema().size();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
index ff6fc4a..a76b91d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -66,7 +66,7 @@ public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
     numGroupingColumns = plan.getGroupingColumns().length;
     resultTupleLength = numGroupingColumns;
 
-    List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+    List<GroupbyNode> groupbyNodes = plan.getSubPlans();
 
     List<DistinctFinalAggregator> aggregatorList = new ArrayList<DistinctFinalAggregator>();
     int inTupleIndex = 1 + numGroupingColumns;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
index a3e586a..2fa272a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
@@ -28,6 +28,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.util.List;
@@ -38,7 +39,7 @@ public interface TaskRequest extends ProtoObject<TajoWorkerProtocol.TaskRequestP
 	public List<CatalogProtos.FragmentProto> getFragments();
 	public String getOutputTableId();
 	public boolean isClusteredOutput();
-	public String getSerializedData();
+	public PlanProto.LogicalNodeTree getPlan();
 	public boolean isInterQuery();
 	public void setInterQuery();
 	public void addFetch(String name, FetchImpl fetch);

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
index cef5488..b4727dc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
@@ -25,6 +25,7 @@ import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
 import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProtoOrBuilder;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.worker.FetchImpl;
 
 import java.util.ArrayList;
@@ -39,7 +40,7 @@ public class TaskRequestImpl implements TaskRequest {
   private String outputTable;
 	private boolean isUpdated;
 	private boolean clusteredOutput;
-	private String serializedData;     // logical node
+	private PlanProto.LogicalNodeTree plan;     // logical node
 	private Boolean interQuery;
 	private List<FetchImpl> fetches;
   private Boolean shouldDie;
@@ -59,9 +60,10 @@ public class TaskRequestImpl implements TaskRequest {
 	
 	public TaskRequestImpl(TaskAttemptId id, List<FragmentProto> fragments,
 												 String outputTable, boolean clusteredOutput,
-												 String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
+												 PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel channel,
+												 Enforcer enforcer) {
 		this();
-		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer);
+		this.set(id, fragments, outputTable, clusteredOutput, plan, queryContext, channel, enforcer);
 	}
 	
 	public TaskRequestImpl(TaskRequestProto proto) {
@@ -73,12 +75,12 @@ public class TaskRequestImpl implements TaskRequest {
 	
 	public void set(TaskAttemptId id, List<FragmentProto> fragments,
 			String outputTable, boolean clusteredOutput,
-			String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
+			PlanProto.LogicalNodeTree plan, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
 		this.id = id;
 		this.fragments = fragments;
 		this.outputTable = outputTable;
 		this.clusteredOutput = clusteredOutput;
-		this.serializedData = serializedData;
+		this.plan = plan;
 		this.isUpdated = true;
     this.queryContext = queryContext;
     this.queryContext = queryContext;
@@ -150,16 +152,16 @@ public class TaskRequestImpl implements TaskRequest {
 	}
 
 	@Override
-	public String getSerializedData() {
+	public PlanProto.LogicalNodeTree getPlan() {
 		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (this.serializedData != null) {
-			return this.serializedData;
+		if (this.plan != null) {
+			return this.plan;
 		}
-		if (!p.hasSerializedData()) {
+		if (!p.hasPlan()) {
 			return null;
 		}
-		this.serializedData = p.getSerializedData();
-		return this.serializedData;
+		this.plan = p.getPlan();
+		return this.plan;
 	}
 
 	public boolean isInterQuery() {
@@ -292,8 +294,8 @@ public class TaskRequestImpl implements TaskRequest {
 		if (this.isUpdated) {
 			builder.setClusteredOutput(this.clusteredOutput);
 		}
-		if (this.serializedData != null) {
-			builder.setSerializedData(this.serializedData);
+		if (this.plan != null) {
+			builder.setPlan(this.plan);
 		}
 		if (this.interQuery != null) {
 		  builder.setInterQuery(this.interQuery);

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
index 9787276..29dc845 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
@@ -18,23 +18,25 @@
 
 package org.apache.tajo.engine.utils.test;
 
+import org.apache.tajo.OverridableConf;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 
-public class ErrorInjectionRewriter implements RewriteRule {
+@SuppressWarnings("unused")
+public class ErrorInjectionRewriter implements LogicalPlanRewriteRule {
   @Override
   public String getName() {
     return "ErrorInjectionRewriter";
   }
 
   @Override
-  public boolean isEligible(LogicalPlan plan) {
+  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
     return true;
   }
 
   @Override
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
     throw new NullPointerException();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index dd6233c..1cd6587 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -35,13 +35,15 @@ import org.apache.tajo.engine.query.TaskRequest;
 import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.Stage;
 import org.apache.tajo.master.querymaster.Task;
 import org.apache.tajo.master.querymaster.TaskAttempt;
-import org.apache.tajo.master.querymaster.Stage;
-import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.plan.serder.LogicalNodeSerializer;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.storage.DataLocation;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
@@ -125,7 +127,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     builder.setId(NULL_ATTEMPT_ID.getProto());
     builder.setShouldDie(true);
     builder.setOutputTable("");
-    builder.setSerializedData("");
+    builder.setPlan(PlanProto.LogicalNodeTree.newBuilder());
     builder.setClusteredOutput(false);
     stopTaskRunnerReq = builder.build();
   }
@@ -838,7 +840,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               new ArrayList<FragmentProto>(task.getAllFragments()),
               "",
               false,
-              task.getLogicalPlan().toJson(),
+              LogicalNodeSerializer.serialize(task.getLogicalPlan()),
               context.getMasterContext().getQueryContext(),
               stage.getDataChannel(), stage.getBlock().getEnforcer());
           if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) {
@@ -894,7 +896,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               Lists.newArrayList(task.getAllFragments()),
               "",
               false,
-              task.getLogicalPlan().toJson(),
+              LogicalNodeSerializer.serialize(task.getLogicalPlan()),
               context.getMasterContext().getQueryContext(),
               stage.getDataChannel(),
               stage.getBlock().getEnforcer());

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index d7e7670..51964f0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -94,6 +94,7 @@ public class GlobalEngine extends AbstractService {
       annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);
+      throw new RuntimeException(t);
     }
     super.start();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 3585ae7..10701f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -144,10 +144,10 @@ public class QueryExecutor {
 
       // others
     } else {
-      if (setSessionNode.isDefaultValue()) {
-        session.removeVariable(varName);
-      } else {
+      if (setSessionNode.hasValue()) {
         session.setVariable(varName, setSessionNode.getValue());
+      } else {
+        session.removeVariable(varName);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index e3d3d79..720d60a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -37,13 +37,11 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
-import org.apache.tajo.master.exec.prehook.InsertIntoHook;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
@@ -53,7 +51,6 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.master.TajoAsyncDispatcher;
 import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
@@ -380,10 +377,10 @@ public class QueryMasterTask extends CompositeService {
           if (tableDesc == null) {
             throw new VerifyException("Can't get table meta data from catalog: " + tableName);
           }
-          List<RewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+          List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
               getQueryTaskContext().getQueryContext(), tableDesc);
           if (storageSpecifiedRewriteRules != null) {
-            for (RewriteRule eachRule: storageSpecifiedRewriteRules) {
+            for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
               optimizer.addRuleAfterToJoinOpt(eachRule);
             }
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 70a3202..5f9c6ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -39,8 +39,8 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.engine.query.QueryContext;
@@ -124,7 +124,7 @@ public class Task {
     this.context.setEnforcer(request.getEnforcer());
     this.inputStats = new TableStats();
 
-    plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
+    plan = LogicalNodeDeserializer.deserialize(queryContext, request.getPlan());
     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
     if (scanNode != null) {
       for (LogicalNode node : scanNode) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index 5acbcd9..b8c9575 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -70,7 +70,7 @@ message TaskRequestProto {
     repeated FragmentProto fragments = 2;
     required string outputTable = 3;
     required bool clusteredOutput = 4;
-    required string serializedData = 5;
+    required LogicalNodeTree plan = 5;
     optional bool interQuery = 6 [default = false];
     repeated FetchProto fetches = 7;
     optional bool shouldDie = 8;
@@ -261,7 +261,7 @@ message JoinEnforce {
     MERGE_JOIN = 4;
   }
 
-  required int32 pid = 1;
+  required int32 nodeId = 1;
   required JoinAlgorithm algorithm = 2;
 }
 
@@ -271,7 +271,7 @@ message GroupbyEnforce {
     SORT_AGGREGATION = 1;
   }
 
-  required int32 pid = 1;
+  required int32 nodeId = 1;
   required GroupbyAlgorithm algorithm = 2;
   repeated SortSpecProto sortSpecs = 3;
 }
@@ -282,7 +282,7 @@ message SortEnforce {
     MERGE_SORT = 1;
   }
 
-  required int32 pid = 1;
+  required int32 nodeId = 1;
   required SortAlgorithm algorithm = 2;
 }
 
@@ -296,7 +296,7 @@ message ColumnPartitionEnforcer {
     SORT_PARTITION = 1;
   }
 
-  required int32 pid = 1;
+  required int32 nodeId = 1;
   required ColumnPartitionAlgorithm algorithm = 2;
 }
 
@@ -313,10 +313,10 @@ message DistinctGroupbyEnforcer {
   }
 
   message SortSpecArray {
-    required int32 pid = 1;
+    required int32 nodeId = 1;
     repeated SortSpecProto sortSpecs = 2;
   }
-  required int32 pid = 1;
+  required int32 nodeId = 1;
   required DistinctAggregationAlgorithm algorithm = 2;
   repeated SortSpecArray sortSpecArrays = 3;
   required bool isMultipleAggregation = 4 [default = false];

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 7dc1089..5ff637c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -42,12 +42,14 @@ import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.Query;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.Stage;
 import org.apache.tajo.master.querymaster.StageState;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
@@ -57,10 +59,7 @@ import java.io.*;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.sql.ResultSet;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.UUID;
+import java.util.*;
 
 public class TajoTestingCluster {
 	private static Log LOG = LogFactory.getLog(TajoTestingCluster.class);
@@ -119,10 +118,18 @@ public class TajoTestingCluster {
   }
 
   void initPropertiesAndConfigs() {
+
+    // Set time zone
     TimeZone testDefaultTZ = TimeZone.getTimeZone(TajoConstants.DEFAULT_SYSTEM_TIMEZONE);
     conf.setSystemTimezone(testDefaultTZ);
     TimeZone.setDefault(testDefaultTZ);
 
+    // Injection of equality testing code of logical plan (de)serialization
+    conf.setClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, LogicalPlanTestRuleProvider.class);
+    conf.setClassVar(ConfVars.GLOBAL_PLAN_REWRITE_RULE_PROVIDER_CLASS, GlobalPlanTestRuleProvider.class);
+
+
+    // default resource manager
     if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
       String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
       Preconditions.checkState(testResourceManager.equals(TajoWorkerResourceManager.class.getCanonicalName()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index e286b92..4e4b710 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -38,8 +38,8 @@ import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.serder.EvalTreeProtoDeserializer;
-import org.apache.tajo.plan.serder.EvalTreeProtoSerializer;
+import org.apache.tajo.plan.serder.EvalNodeDeserializer;
+import org.apache.tajo.plan.serder.EvalNodeSerializer;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.serder.PlanProto;
@@ -62,7 +62,9 @@ import java.util.TimeZone;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public class ExprTestBase {
   private static TajoTestingCluster util;
@@ -141,7 +143,7 @@ public class ExprTestBase {
       assertFalse(state.getErrorMessages().get(0), true);
     }
     LogicalPlan plan = planner.createPlan(context, expr, true);
-    optimizer.optimize(plan);
+    optimizer.optimize(context, plan);
     annotatedPlanVerifier.verify(context, state, plan);
 
     if (state.getErrorMessages().size() > 0) {
@@ -318,7 +320,7 @@ public class ExprTestBase {
   }
 
   public static void assertEvalTreeProtoSerDer(OverridableConf context, EvalNode evalNode) {
-    PlanProto.EvalTree converted = EvalTreeProtoSerializer.serialize(evalNode);
-    assertEquals(evalNode, EvalTreeProtoDeserializer.deserialize(context, converted));
+    PlanProto.EvalNodeTree converted = EvalNodeSerializer.serialize(evalNode);
+    assertEquals(evalNode, EvalNodeDeserializer.deserialize(context, converted));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
index bfd1700..794c14f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java
@@ -344,55 +344,84 @@ public class TestGroupByQuery extends QueryTestCaseBase {
   }
 
   @Test
-  public final void testDistinctAggregationCasebyCase() throws Exception {
-    ResultSet res;
-
+  public final void testDistinctAggregationCasebyCase1() throws Exception {
     // one groupby, distinct, aggregation
-    res = executeFile("testDistinctAggregation_case1.sql");
+    ResultSet res = executeFile("testDistinctAggregation_case1.sql");
     assertResultSet(res, "testDistinctAggregation_case1.result");
     res.close();
+  }
 
+  @Test
+  public final void testDistinctAggregationCasebyCase2() throws Exception {
     // one groupby, two distinct, one aggregation
-    res = executeFile("testDistinctAggregation_case2.sql");
+    ResultSet res = executeFile("testDistinctAggregation_case2.sql");
     assertResultSet(res, "testDistinctAggregation_case2.result");
     res.close();
+  }
 
+  @Test
+  public final void testDistinctAggregationCasebyCase3() throws Exception {
     // one groupby, two distinct, two aggregation(no alias)
-    res = executeFile("testDistinctAggregation_case3.sql");
+    ResultSet res = executeFile("testDistinctAggregation_case3.sql");
     assertResultSet(res, "testDistinctAggregation_case3.result");
     res.close();
+  }
 
+  @Test
+  public final void testDistinctAggregationCasebyCase4() throws Exception {
     // two groupby, two distinct, two aggregation
-    res = executeFile("testDistinctAggregation_case4.sql");
+    ResultSet res = executeFile("testDistinctAggregation_case4.sql");
     assertResultSet(res, "testDistinctAggregation_case4.result");
     res.close();
+  }
 
+  @Test
+  public final void testDistinctAggregationCasebyCase5() throws Exception {
     // two groupby, two distinct, two aggregation with stage
-    res = executeFile("testDistinctAggregation_case5.sql");
+    ResultSet res = executeFile("testDistinctAggregation_case5.sql");
     assertResultSet(res, "testDistinctAggregation_case5.result");
     res.close();
+  }
 
-    res = executeFile("testDistinctAggregation_case6.sql");
+  @Test
+  public final void testDistinctAggregationCasebyCase6() throws Exception {
+    ResultSet res = executeFile("testDistinctAggregation_case6.sql");
     assertResultSet(res, "testDistinctAggregation_case6.result");
     res.close();
+  }
 
-    res = executeFile("testDistinctAggregation_case7.sql");
+  @Test
+  public final void testDistinctAggregationCasebyCase7() throws Exception {
+    ResultSet res = executeFile("testDistinctAggregation_case7.sql");
     assertResultSet(res, "testDistinctAggregation_case7.result");
     res.close();
+  }
 
-    res = executeFile("testDistinctAggregation_case8.sql");
+  @Test
+  public final void testDistinctAggregationCasebyCase8() throws Exception {
+    ResultSet res = executeFile("testDistinctAggregation_case8.sql");
     assertResultSet(res, "testDistinctAggregation_case8.result");
     res.close();
+  }
 
-    res = executeFile("testDistinctAggregation_case9.sql");
+  @Test
+  public final void testDistinctAggregationCasebyCase9() throws Exception {
+    ResultSet res = executeFile("testDistinctAggregation_case9.sql");
     assertResultSet(res, "testDistinctAggregation_case9.result");
     res.close();
+  }
 
-    res = executeFile("testDistinctAggregation_case10.sql");
+  @Test
+  public final void testDistinctAggregationCasebyCase10() throws Exception {
+    ResultSet res = executeFile("testDistinctAggregation_case10.sql");
     assertResultSet(res, "testDistinctAggregation_case10.result");
     res.close();
+  }
+
+  @Test
+  public final void testDistinctAggregationCasebyCase11() throws Exception {
+    ResultSet res;
 
-    // case9
     KeyValueSet tableOptions = new KeyValueSet();
     tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
@@ -417,7 +446,7 @@ public class TestGroupByQuery extends QueryTestCaseBase {
 
     assertEquals(expected, resultSetToString(res));
 
-  // multiple distinct with expression
+    // multiple distinct with expression
     res = executeString(
         "select count(distinct code) + count(distinct qty) from table10"
     );


Mime
View raw message