tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [20/35] TAJO-1125: Separate logical plan and optimizer into a maven module.
Date Sun, 26 Oct 2014 19:27:26 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
index 239dabf..ff6fc4a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
@@ -21,12 +21,11 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
-import org.apache.tajo.engine.function.FunctionContext;
-import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.plan.logical.DistinctGroupbyNode;
+import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index b1ab7c4..4e02e67 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.logical.EvalExprNode;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.logical.EvalExprNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index cfd7fb7..4c0caea 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -36,7 +36,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.PhysicalPlanningException;
-import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.Scanner;
 import org.apache.tajo.storage.fragment.FileFragment;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index 3323d1f..80bba2b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.function.FunctionContext;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index e27a43d..c28a5cd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -21,7 +21,7 @@ package org.apache.tajo.engine.planner.physical;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.plan.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.StringUtils;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 9dabbb3..cf333b0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -20,12 +20,12 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.codegen.CompilationError;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index 426a7a1..18be6b9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -20,11 +20,11 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 50a1438..236f5e3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index b752db5..c38be88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -22,14 +22,14 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.AlgebraicUtil;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.plan.expr.AlgebraicUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 4fdd03a..5196a63 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index 47fcb8d..5bf80fd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -26,7 +26,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.planner.logical.ShuffleFileWriteNode;
+import org.apache.tajo.plan.logical.ShuffleFileWriteNode;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.HashShuffleAppender;
 import org.apache.tajo.storage.HashShuffleAppenderManager;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index 0418f65..f9f4351 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.logical.HavingNode;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.HavingNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
index d736c25..14e2366 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/LimitExec.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.planner.logical.LimitNode;
+import org.apache.tajo.plan.logical.LimitNode;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
index 9f4f20a..13fec7b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MemSortExec.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.planner.logical.SortNode;
+import org.apache.tajo.plan.logical.SortNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index e1cc6a8..cb2552b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -20,11 +20,10 @@ package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
@@ -84,7 +83,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
 
     this.joincomparator = new JoinTupleComparator(leftChild.getSchema(),
         rightChild.getSchema(), sortSpecs);
-    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+    this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual(
         plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
 
     // for projection

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index bbfe973..20df210 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -20,10 +20,9 @@ package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
@@ -78,7 +77,7 @@ public class MergeJoinExec extends BinaryPhysicalExec {
 
     this.joincomparator = new JoinTupleComparator(outer.getSchema(),
         inner.getSchema(), sortSpecs);
-    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+    this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual(
         plan.getJoinQual(), outer.getSchema(), inner.getSchema());
     this.outerIterator = outerTupleSlots.iterator();
     this.innerIterator = innerTupleSlots.iterator();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index dc061ed..b5c6244 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 37ef7df..8ff7570 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -18,10 +18,10 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 4c72075..0569c1b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -21,8 +21,8 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.collect.Lists;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.PlannerUtil;
-import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index 2f55cf7..1b824b5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -18,15 +18,34 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.PhysicalPlanningException;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.PersistentStoreNode;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class PhysicalPlanUtil {
   public static <T extends PhysicalExec> T findExecutor(PhysicalExec plan, Class<? extends PhysicalExec> clazz)
@@ -34,6 +53,106 @@ public class PhysicalPlanUtil {
     return (T) new FindVisitor().visit(plan, new Stack<PhysicalExec>(), clazz);
   }
 
+  public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+    SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(joinQual, leftSchema, rightSchema);
+    TupleComparator[] comparators = new TupleComparator[2];
+    comparators[0] = new TupleComparator(leftSchema, sortSpecs[0]);
+    comparators[1] = new TupleComparator(rightSchema, sortSpecs[1]);
+    return comparators;
+  }
+
+  /**
+   * Listing table data file which is not empty.
+   * If the table is a partitioned table, return file list which has same partition key.
+   * @param tajoConf
+   * @param tableDesc
+   * @param fileIndex
+   * @param numResultFiles
+   * @return
+   * @throws java.io.IOException
+   */
+  public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc,
+                                                          int fileIndex, int numResultFiles) throws IOException {
+    FileSystem fs = tableDesc.getPath().getFileSystem(tajoConf);
+
+    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
+    if (fs.exists(tableDesc.getPath())) {
+      getNonZeroLengthDataFiles(fs, tableDesc.getPath(), nonZeroLengthFiles, fileIndex, numResultFiles,
+          new AtomicInteger(0));
+    }
+
+    List<FileFragment> fragments = new ArrayList<FileFragment>();
+
+    //In the case of partitioned table, return same partition key data files.
+    int numPartitionColumns = 0;
+    if (tableDesc.hasPartition()) {
+      numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
+    }
+    String[] previousPartitionPathNames = null;
+    for (FileStatus eachFile: nonZeroLengthFiles) {
+      FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
+
+      if (numPartitionColumns > 0) {
+        // finding partition key;
+        Path filePath = fileFragment.getPath();
+        Path parentPath = filePath;
+        String[] parentPathNames = new String[numPartitionColumns];
+        for (int i = 0; i < numPartitionColumns; i++) {
+          parentPath = parentPath.getParent();
+          parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
+        }
+
+        // If current partitionKey == previousPartitionKey, add to result.
+        if (previousPartitionPathNames == null) {
+          fragments.add(fileFragment);
+        } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) {
+          fragments.add(fileFragment);
+        } else {
+          break;
+        }
+        previousPartitionPathNames = parentPathNames;
+      } else {
+        fragments.add(fileFragment);
+      }
+    }
+    return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[]{}));
+  }
+
+  private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
+                                         int startFileIndex, int numResultFiles,
+                                         AtomicInteger currentFileIndex) throws IOException {
+    if (fs.isDirectory(path)) {
+      FileStatus[] files = fs.listStatus(path, StorageManager.hiddenFileFilter);
+      if (files != null && files.length > 0) {
+        for (FileStatus eachFile : files) {
+          if (result.size() >= numResultFiles) {
+            return;
+          }
+          if (eachFile.isDirectory()) {
+            getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles,
+                currentFileIndex);
+          } else if (eachFile.isFile() && eachFile.getLen() > 0) {
+            if (currentFileIndex.get() >= startFileIndex) {
+              result.add(eachFile);
+            }
+            currentFileIndex.incrementAndGet();
+          }
+        }
+      }
+    } else {
+      FileStatus fileStatus = fs.getFileStatus(path);
+      if (fileStatus != null && fileStatus.getLen() > 0) {
+        if (currentFileIndex.get() >= startFileIndex) {
+          result.add(fileStatus);
+        }
+        currentFileIndex.incrementAndGet();
+        if (result.size() >= numResultFiles) {
+          return;
+        }
+      }
+    }
+  }
+
   private static class FindVisitor extends BasicPhysicalExecutorVisitor<Class<? extends PhysicalExec>, PhysicalExec> {
     public PhysicalExec visit(PhysicalExec exec, Stack<PhysicalExec> stack, Class<? extends PhysicalExec> target)
         throws PhysicalPlanningException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index 89cd75a..72a667d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -22,7 +22,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.Projectable;
+import org.apache.tajo.plan.logical.Projectable;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 4e5d1cf..dd72910 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.worker.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 5d4dad5..9d8122f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -21,11 +21,10 @@ package org.apache.tajo.engine.planner.physical;
 import com.google.common.base.Preconditions;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
@@ -83,7 +82,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
     sortSpecs[1] = innerSortKey;
 
     this.joinComparator = new JoinTupleComparator(outer.getSchema(), inner.getSchema(), sortSpecs);
-    this.tupleComparator = PlannerUtil.getComparatorsFromJoinQual(
+    this.tupleComparator = PhysicalPlanUtil.getComparatorsFromJoinQual(
         plan.getJoinQual(), outer.getSchema(), inner.getSchema());
 
     // for projection

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 5ae9a8f..9e84462 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -19,8 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.engine.codegen.CompilationError;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.planner.logical.SelectionNode;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.logical.SelectionNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c7f8e2d..c6a4f55 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -27,17 +27,17 @@ import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.codegen.CompilationError;
-import org.apache.tajo.engine.eval.ConstEval;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.engine.utils.TupleCache;
 import org.apache.tajo.engine.utils.TupleCacheKey;
-import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.plan.util.SchemaUtil;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.ConstEval;
+import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
+import org.apache.tajo.plan.expr.FieldEval;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
@@ -118,7 +118,8 @@ public class SeqScanExec extends PhysicalExec {
 
     // Get a partition key value from a given path
     Tuple partitionRow =
-        TupleUtil.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), false);
+        PartitionedTableRewriter.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(),
+            false);
 
     // Targets or search conditions may contain column references.
     // However, actual values absent in tuples. So, Replace all column references by constant datum.

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
index c4d43a3..425eb86 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -19,8 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.function.FunctionContext;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 6084f0e..f7c20fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -23,7 +23,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.plan.logical.StoreTableNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.StringUtils;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index b496e42..0dc172c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -26,8 +26,8 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.engine.planner.logical.InsertNode;
-import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
+import org.apache.tajo.plan.logical.InsertNode;
+import org.apache.tajo.plan.logical.PersistentStoreNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
index 497c6d3..0383ae7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
@@ -22,7 +22,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.plan.InvalidQueryException;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
index 7aeed13..3f4b22b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
@@ -23,10 +23,10 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.WindowFunctionEval;
-import org.apache.tajo.engine.function.FunctionContext;
-import org.apache.tajo.engine.planner.logical.WindowAggNode;
-import org.apache.tajo.engine.planner.logical.WindowSpec;
+import org.apache.tajo.plan.expr.WindowFunctionEval;
+import org.apache.tajo.plan.function.FunctionContext;
+import org.apache.tajo.plan.logical.WindowAggNode;
+import org.apache.tajo.plan.logical.WindowSpec;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java
deleted file mode 100644
index 6b3ed1e..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.PlanningException;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This is a basic query rewrite rule engine. This rewrite rule engine
- * rewrites a logical plan with various query rewrite rules.
- */
-public class BasicQueryRewriteEngine implements QueryRewriteEngine {
-  /** class logger */
-  private Log LOG = LogFactory.getLog(BasicQueryRewriteEngine.class);
-
-  /** a map for query rewrite rules  */
-  private Map<String, RewriteRule> rewriteRules = new LinkedHashMap<String, RewriteRule>();
-
-  /**
-   * Add a query rewrite rule to this engine.
-   *
-   * @param rule The rule to be added to this engine.
-   */
-  public void addRewriteRule(RewriteRule rule) {
-    if (!rewriteRules.containsKey(rule.getName())) {
-      rewriteRules.put(rule.getName(), rule);
-    }
-  }
-
-  /**
-   * Rewrite a logical plan with all query rewrite rules added to this engine.
-   *
-   * @param plan The plan to be rewritten with all query rewrite rule.
-   * @return The rewritten plan.
-   */
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
-    RewriteRule rule;
-    for (Entry<String, RewriteRule> rewriteRule : rewriteRules.entrySet()) {
-      rule = rewriteRule.getValue();
-      if (rule.isEligible(plan)) {
-        plan = rule.rewrite(plan);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query.");
-        }
-      }
-    }
-
-    return plan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
deleted file mode 100644
index b5dafcb..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ /dev/null
@@ -1,910 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.rewrite;
-
-import com.google.common.collect.*;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.algebra.JoinType;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.exception.InvalidQueryException;
-import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule.FilterPushDownContext;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-/**
- * This rule tries to push down all filter conditions into logical nodes as lower as possible.
- * It is likely to significantly reduces the intermediate data.
- */
-public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownContext, LogicalNode>
-    implements RewriteRule {
-  private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class);
-  private static final String NAME = "FilterPushDown";
-
-  static class FilterPushDownContext {
-    Set<EvalNode> pushingDownFilters = new HashSet<EvalNode>();
-
-    public void clear() {
-      pushingDownFilters.clear();
-    }
-    public void setFiltersTobePushed(Collection<EvalNode> workingEvals) {
-      this.pushingDownFilters.clear();
-      this.pushingDownFilters.addAll(workingEvals);
-    }
-    public void addFiltersTobePushed(Collection<EvalNode> workingEvals) {
-      this.pushingDownFilters.addAll(workingEvals);
-    }
-
-    public void setToOrigin(Map<EvalNode, EvalNode> evalMap) {
-      //evalMap: copy -> origin
-      List<EvalNode> origins = new ArrayList<EvalNode>();
-      for (EvalNode eval : pushingDownFilters) {
-        EvalNode origin = evalMap.get(eval);
-        if (origin != null) {
-          origins.add(origin);
-        }
-      }
-      setFiltersTobePushed(origins);
-    }
-  }
-
-  @Override
-  public String getName() {
-    return NAME;
-  }
-
-  @Override
-  public boolean isEligible(LogicalPlan plan) {
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
-      if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
-    /*
-    FilterPushDown rule: processing when visits each node
-      - If a target which is corresponding on a filter EvalNode's column is not FieldEval, do not PushDown.
-      - Replace filter EvalNode's column with child node's output column.
-        If there is no child node's output column, do not PushDown.
-      - When visit ScanNode, add filter eval to ScanNode's qual
-      - When visit GroupByNode, Find aggregation column in a filter EvalNode and
-        . If a parent is HavingNode, add filter eval to parent HavingNode.
-        . It not, create new HavingNode and set parent's child.
-     */
-    FilterPushDownContext context = new FilterPushDownContext();
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
-      context.clear();
-      this.visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>());
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("=============================================");
-      LOG.debug("FilterPushDown Optimized Query: \n" + plan.toString());
-      LOG.debug("=============================================");
-    }
-    return plan;
-  }
-
-  @Override
-  public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                 SelectionNode selNode, Stack<LogicalNode> stack) throws PlanningException {
-    context.pushingDownFilters.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
-
-    stack.push(selNode);
-    visit(context, plan, block, selNode.getChild(), stack);
-    stack.pop();
-
-    if(context.pushingDownFilters.size() == 0) {
-      // remove the selection operator if there is no search condition after selection push.
-      LogicalNode node = stack.peek();
-      if (node instanceof UnaryNode) {
-        UnaryNode unary = (UnaryNode) node;
-        unary.setChild(selNode.getChild());
-      } else {
-        throw new InvalidQueryException("Unexpected Logical Query Plan");
-      }
-    } else { // if there remain search conditions
-
-      // check if it can be evaluated here
-      Set<EvalNode> matched = TUtil.newHashSet();
-      for (EvalNode eachEval : context.pushingDownFilters) {
-        if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
-          matched.add(eachEval);
-        }
-      }
-
-      // if there are search conditions which can be evaluated here,
-      // push down them and remove them from context.pushingDownFilters.
-      if (matched.size() > 0) {
-        selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
-        context.pushingDownFilters.removeAll(matched);
-      }
-    }
-
-    return selNode;
-  }
-
-  @Override
-  public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
-                               Stack<LogicalNode> stack) throws PlanningException {
-    // here we should stop selection pushdown on the null supplying side(s) of an outer join
-    // get the two operands of the join operation as well as the join type
-    JoinType joinType = joinNode.getJoinType();
-    EvalNode joinQual = joinNode.getJoinQual();
-    if (joinQual != null && LogicalPlanner.isOuterJoin(joinType)) {
-      BinaryEval binaryEval = (BinaryEval) joinQual;
-      // if both are fields
-      if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
-          binaryEval.getRightExpr().getType() == EvalType.FIELD) {
-
-        String leftTableName = ((FieldEval) binaryEval.getLeftExpr()).getQualifier();
-        String rightTableName = ((FieldEval) binaryEval.getRightExpr()).getQualifier();
-        List<String> nullSuppliers = Lists.newArrayList();
-        Set<String> leftTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
-            joinNode.getLeftChild()));
-        Set<String> rightTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
-            joinNode.getRightChild()));
-
-        // some verification
-        if (joinType == JoinType.FULL_OUTER) {
-          nullSuppliers.add(leftTableName);
-          nullSuppliers.add(rightTableName);
-
-          // verify that these null suppliers are indeed in the left and right sets
-          if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-          if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-
-        } else if (joinType == JoinType.LEFT_OUTER) {
-          nullSuppliers.add(((RelationNode)joinNode.getRightChild()).getCanonicalName());
-          //verify that this null supplier is indeed in the right sub-tree
-          if (!rightTableSet.contains(nullSuppliers.get(0))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-        } else if (joinType == JoinType.RIGHT_OUTER) {
-          if (((RelationNode)joinNode.getRightChild()).getCanonicalName().equals(rightTableName)) {
-            nullSuppliers.add(leftTableName);
-          } else {
-            nullSuppliers.add(rightTableName);
-          }
-
-          // verify that this null supplier is indeed in the left sub-tree
-          if (!leftTableSet.contains(nullSuppliers.get(0))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-        }
-      }
-    }
-
-    // get evals from ON clause
-    List<EvalNode> onConditions = new ArrayList<EvalNode>();
-    if (joinNode.hasJoinQual()) {
-      onConditions.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
-    }
-
-    boolean isTopMostJoin = stack.peek().getType() != NodeType.JOIN;
-
-    List<EvalNode> outerJoinPredicationEvals = new ArrayList<EvalNode>();
-    List<EvalNode> outerJoinFilterEvalsExcludePredication = new ArrayList<EvalNode>();
-    if (LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
-      // TAJO-853
-      // In the case of top most JOIN, all filters except JOIN condition aren't pushed down.
-      // That filters are processed by SELECTION NODE.
-      Set<String> nullSupplyingTableNameSet;
-      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
-        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
-      } else {
-        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
-      }
-
-      Set<String> preservedTableNameSet;
-      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
-        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
-      } else {
-        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
-      }
-
-      List<EvalNode> removedFromFilter = new ArrayList<EvalNode>();
-      for (EvalNode eachEval: context.pushingDownFilters) {
-        if (EvalTreeUtil.isJoinQual(block, eachEval, true)) {
-          outerJoinPredicationEvals.add(eachEval);
-          removedFromFilter.add(eachEval);
-        } else {
-          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachEval);
-          boolean canPushDown = true;
-          for (Column eachColumn: columns) {
-            if (nullSupplyingTableNameSet.contains(eachColumn.getQualifier())) {
-              canPushDown = false;
-              break;
-            }
-          }
-          if (!canPushDown) {
-            outerJoinFilterEvalsExcludePredication.add(eachEval);
-            removedFromFilter.add(eachEval);
-          }
-        }
-      }
-
-      context.pushingDownFilters.removeAll(removedFromFilter);
-
-      for (EvalNode eachOnEval: onConditions) {
-        if (EvalTreeUtil.isJoinQual(eachOnEval, true)) {
-          // If join condition, processing in the JoinNode.
-          outerJoinPredicationEvals.add(eachOnEval);
-        } else {
-          // If Eval has a column which belong to Preserved Row table, not using to push down but using JoinCondition
-          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachOnEval);
-          boolean canPushDown = true;
-          for (Column eachColumn: columns) {
-            if (preservedTableNameSet.contains(eachColumn.getQualifier())) {
-              canPushDown = false;
-              break;
-            }
-          }
-          if (canPushDown) {
-            context.pushingDownFilters.add(eachOnEval);
-          } else {
-            outerJoinPredicationEvals.add(eachOnEval);
-          }
-        }
-      }
-    } else {
-      context.pushingDownFilters.addAll(onConditions);
-    }
-
-    LogicalNode left = joinNode.getLeftChild();
-    LogicalNode right = joinNode.getRightChild();
-
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
-    // Join's input schema = right child output columns + left child output columns
-    Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, block, joinNode, left, notMatched,
-        null, true, 0);
-    context.setFiltersTobePushed(transformedMap.keySet());
-    visit(context, plan, block, left, stack);
-
-    context.setToOrigin(transformedMap);
-    context.addFiltersTobePushed(notMatched);
-
-    notMatched.clear();
-    transformedMap = findCanPushdownAndTransform(context, block, joinNode, right, notMatched, null, true,
-        left.getOutSchema().size());
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
-
-    visit(context, plan, block, right, stack);
-
-    context.setToOrigin(transformedMap);
-    context.addFiltersTobePushed(notMatched);
-
-    notMatched.clear();
-    List<EvalNode> matched = Lists.newArrayList();
-    if(LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
-      matched.addAll(outerJoinPredicationEvals);
-    } else {
-      for (EvalNode eval : context.pushingDownFilters) {
-        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, eval, joinNode, isTopMostJoin)) {
-          matched.add(eval);
-        }
-      }
-    }
-
-    EvalNode qual = null;
-    if (matched.size() > 1) {
-      // merged into one eval tree
-      qual = AlgebraicUtil.createSingletonExprFromCNF(
-          matched.toArray(new EvalNode[matched.size()]));
-    } else if (matched.size() == 1) {
-      // if the number of matched expr is one
-      qual = matched.get(0);
-    }
-
-    if (qual != null) {
-      joinNode.setJoinQual(qual);
-
-      if (joinNode.getJoinType() == JoinType.CROSS) {
-        joinNode.setJoinType(JoinType.INNER);
-      }
-      context.pushingDownFilters.removeAll(matched);
-    }
-
-    context.pushingDownFilters.addAll(outerJoinFilterEvalsExcludePredication);
-    return joinNode;
-  }
-
-  private Map<EvalNode, EvalNode> transformEvalsWidthByPassNode(
-      Collection<EvalNode> originEvals, LogicalPlan plan,
-      LogicalPlan.QueryBlock block,
-      LogicalNode node, LogicalNode childNode) throws PlanningException {
-    // transformed -> pushingDownFilters
-    Map<EvalNode, EvalNode> transformedMap = new HashMap<EvalNode, EvalNode>();
-
-    if (originEvals.isEmpty()) {
-      return transformedMap;
-    }
-
-    if (node.getType() == NodeType.UNION) {
-      // If node is union, All eval's columns are simple name and matched with child's output schema.
-      Schema childOutSchema = childNode.getOutSchema();
-      for (EvalNode eval : originEvals) {
-        EvalNode copy;
-        try {
-          copy = (EvalNode) eval.clone();
-        } catch (CloneNotSupportedException e) {
-          throw new PlanningException(e);
-        }
-
-        Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
-        for (Column c : columns) {
-          Column column = childOutSchema.getColumn(c.getSimpleName());
-          if (column == null) {
-            throw new PlanningException(
-                "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
-                    + c.getQualifiedName() + " for FilterPushDown(" + eval + "), " +
-                    "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")");
-          }
-          EvalTreeUtil.changeColumnRef(copy, c.getSimpleName(), column.getQualifiedName());
-        }
-
-        transformedMap.put(copy, eval);
-      }
-      return transformedMap;
-    }
-
-    if (childNode.getType() == NodeType.UNION) {
-      // If child is union, remove qualifier from eval's column
-      for (EvalNode eval : originEvals) {
-        EvalNode copy;
-        try {
-          copy = (EvalNode) eval.clone();
-        } catch (CloneNotSupportedException e) {
-          throw new PlanningException(e);
-        }
-
-        Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
-        for (Column c : columns) {
-          if (c.hasQualifier()) {
-            EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), c.getSimpleName());
-          }
-        }
-
-        transformedMap.put(copy, eval);
-      }
-
-      return transformedMap;
-    }
-
-    // node in column -> child out column
-    Map<String, String> columnMap = new HashMap<String, String>();
-
-    for (int i = 0; i < node.getInSchema().size(); i++) {
-      String inColumnName = node.getInSchema().getColumn(i).getQualifiedName();
-      Column childOutColumn = childNode.getOutSchema().getColumn(i);
-      columnMap.put(inColumnName, childOutColumn.getQualifiedName());
-    }
-
-    // Rename from upper block's one to lower block's one
-    for (EvalNode matchedEval : originEvals) {
-      EvalNode copy;
-      try {
-        copy = (EvalNode) matchedEval.clone();
-      } catch (CloneNotSupportedException e) {
-        throw new PlanningException(e);
-      }
-
-      Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
-      boolean allMatched = true;
-      for (Column c : columns) {
-        if (columnMap.containsKey(c.getQualifiedName())) {
-          EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), columnMap.get(c.getQualifiedName()));
-        } else {
-          if (childNode.getType() == NodeType.GROUP_BY) {
-            if (((GroupbyNode) childNode).isAggregationColumn(c.getSimpleName())) {
-              allMatched = false;
-              break;
-            }
-          } else {
-            throw new PlanningException(
-                "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
-                    + c.getQualifiedName() + " for FilterPushDown(" + matchedEval + "), " +
-                    "(PID=" + node.getPID() + ", Child=" + childNode.getPID() + ")"
-            );
-          }
-        }
-      }
-      if (allMatched) {
-        transformedMap.put(copy, matchedEval);
-      }
-    }
-
-    return transformedMap;
-  }
-
-  @Override
-  public LogicalNode visitTableSubQuery(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
-                                        TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
-    List<EvalNode> matched = Lists.newArrayList();
-    for (EvalNode eval : context.pushingDownFilters) {
-      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, node)) {
-        matched.add(eval);
-      }
-    }
-
-    // transformed -> pushingDownFilters
-    Map<EvalNode, EvalNode> transformedMap =
-        transformEvalsWidthByPassNode(matched, plan, block, node, node.getSubQuery());
-
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
-    visit(context, plan, plan.getBlock(node.getSubQuery()));
-    context.setToOrigin(transformedMap);
-
-    return node;
-  }
-
-  @Override
-  public LogicalNode visitUnion(FilterPushDownContext context, LogicalPlan plan,
-                                LogicalPlan.QueryBlock block, UnionNode unionNode,
-                                Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode leftNode = unionNode.getLeftChild();
-
-    List<EvalNode> origins = new ArrayList<EvalNode>(context.pushingDownFilters);
-
-    // transformed -> pushingDownFilters
-    Map<EvalNode, EvalNode> transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, leftNode);
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
-    visit(context, plan, plan.getBlock(leftNode));
-
-    if (!context.pushingDownFilters.isEmpty()) {
-      errorFilterPushDown(plan, leftNode, context);
-    }
-
-    LogicalNode rightNode = unionNode.getRightChild();
-    transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, rightNode);
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
-    visit(context, plan, plan.getBlock(rightNode), rightNode, stack);
-
-    if (!context.pushingDownFilters.isEmpty()) {
-      errorFilterPushDown(plan, rightNode, context);
-    }
-
-    // notify all filter matched to upper
-    context.pushingDownFilters.clear();
-    return unionNode;
-  }
-
-  @Override
-  public LogicalNode visitProjection(FilterPushDownContext context,
-                                     LogicalPlan plan,
-                                     LogicalPlan.QueryBlock block,
-                                     ProjectionNode projectionNode,
-                                     Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode childNode = projectionNode.getChild();
-
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
-
-    //copy -> origin
-    BiMap<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(
-        context, block,projectionNode, childNode, notMatched, null, false, 0);
-
-    context.setFiltersTobePushed(transformedMap.keySet());
-
-    stack.push(projectionNode);
-    childNode = visit(context, plan, plan.getBlock(childNode), childNode, stack);
-    stack.pop();
-
-    // find not matched after visiting child
-    for (EvalNode eval: context.pushingDownFilters) {
-      notMatched.add(transformedMap.get(eval));
-    }
-
-    EvalNode qual = null;
-    if (notMatched.size() > 1) {
-      // merged into one eval tree
-      qual = AlgebraicUtil.createSingletonExprFromCNF(notMatched.toArray(new EvalNode[notMatched.size()]));
-    } else if (notMatched.size() == 1) {
-      // if the number of matched expr is one
-      qual = notMatched.get(0);
-    }
-
-    // If there is not matched node add SelectionNode and clear context.pushingDownFilters
-    if (qual != null && LogicalPlanner.checkIfBeEvaluatedAtThis(qual, projectionNode)) {
-      SelectionNode selectionNode = plan.createNode(SelectionNode.class);
-      selectionNode.setInSchema(childNode.getOutSchema());
-      selectionNode.setOutSchema(childNode.getOutSchema());
-      selectionNode.setQual(qual);
-      block.registerNode(selectionNode);
-
-      projectionNode.setChild(selectionNode);
-      selectionNode.setChild(childNode);
-
-      // clean all remain filters because all conditions are merged into a qual
-      context.pushingDownFilters.clear();
-    }
-
-    // if there are remain filters, recover the original names and give back to the upper query block.
-    if (context.pushingDownFilters.size() > 0) {
-      ImmutableSet<EvalNode> copy = ImmutableSet.copyOf(context.pushingDownFilters);
-      context.pushingDownFilters.clear();
-      context.pushingDownFilters.addAll(reverseTransform(transformedMap, copy));
-    }
-
-    return projectionNode;
-  }
-
-  private Collection<EvalNode> reverseTransform(BiMap<EvalNode, EvalNode> map, Set<EvalNode> remainFilters) {
-    Set<EvalNode> reversed = Sets.newHashSet();
-    for (EvalNode evalNode : remainFilters) {
-      reversed.add(map.get(evalNode));
-    }
-    return reversed;
-  }
-
-  private BiMap<EvalNode, EvalNode> findCanPushdownAndTransform(
-      FilterPushDownContext context, LogicalPlan.QueryBlock block, Projectable node,
-      LogicalNode childNode, List<EvalNode> notMatched,
-      Set<String> partitionColumns,
-      boolean ignoreJoin, int columnOffset) throws PlanningException {
-    // canonical name -> target
-    Map<String, Target> nodeTargetMap = new HashMap<String, Target>();
-    for (Target target : node.getTargets()) {
-      nodeTargetMap.put(target.getCanonicalName(), target);
-    }
-
-    // copy -> origin
-    BiMap<EvalNode, EvalNode> matched = HashBiMap.create();
-
-    for (EvalNode eval : context.pushingDownFilters) {
-      if (ignoreJoin && EvalTreeUtil.isJoinQual(block, eval, true)) {
-        notMatched.add(eval);
-        continue;
-      }
-      // If all column is field eval, can push down.
-      Set<Column> evalColumns = EvalTreeUtil.findUniqueColumns(eval);
-      boolean columnMatched = true;
-      for (Column c : evalColumns) {
-        Target target = nodeTargetMap.get(c.getQualifiedName());
-        if (target == null) {
-          columnMatched = false;
-          break;
-        }
-        if (target.getEvalTree().getType() != EvalType.FIELD) {
-          columnMatched = false;
-          break;
-        }
-      }
-
-      if (columnMatched) {
-        // transform eval column to child's output column
-        EvalNode copyEvalNode = transformEval(node, childNode, eval, nodeTargetMap, partitionColumns, columnOffset);
-        if (copyEvalNode != null) {
-          matched.put(copyEvalNode, eval);
-        } else {
-          notMatched.add(eval);
-        }
-      } else {
-        notMatched.add(eval);
-      }
-    }
-
-    return matched;
-  }
-
-  private EvalNode transformEval(Projectable node, LogicalNode childNode, EvalNode origin,
-                                 Map<String, Target> targetMap, Set<String> partitionColumns,
-                                 int columnOffset) throws PlanningException {
-    Schema outputSchema = childNode != null ? childNode.getOutSchema() : node.getInSchema();
-    EvalNode copy;
-    try {
-      copy = (EvalNode) origin.clone();
-    } catch (CloneNotSupportedException e) {
-      throw new PlanningException(e);
-    }
-    Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
-    for (Column c: columns) {
-      Target target = targetMap.get(c.getQualifiedName());
-      if (target == null) {
-        throw new PlanningException(
-            "Invalid Filter PushDown: No such a corresponding target '"
-                + c.getQualifiedName() + " for FilterPushDown(" + origin + "), " +
-                "(PID=" + node.getPID() + ")"
-        );
-      }
-      EvalNode targetEvalNode = target.getEvalTree();
-      if (targetEvalNode.getType() != EvalType.FIELD) {
-        throw new PlanningException(
-            "Invalid Filter PushDown: '" + c.getQualifiedName() + "' target is not FieldEval " +
-                "(PID=" + node.getPID() + ")"
-        );
-      }
-
-      FieldEval fieldEval = (FieldEval)targetEvalNode;
-      Column targetInputColumn = fieldEval.getColumnRef();
-
-      int index;
-      if (targetInputColumn.hasQualifier()) {
-        index = node.getInSchema().getColumnId(targetInputColumn.getQualifiedName());
-      } else {
-        index = node.getInSchema().getColumnIdByName(targetInputColumn.getQualifiedName());
-      }
-      if (columnOffset > 0) {
-        index = index - columnOffset;
-      }
-      if (index < 0 || index >= outputSchema.size()) {
-        if (partitionColumns != null && !partitionColumns.isEmpty() && node instanceof ScanNode) {
-          ScanNode scanNode = (ScanNode)node;
-          boolean isPartitionColumn = false;
-          if (CatalogUtil.isFQColumnName(partitionColumns.iterator().next())) {
-            isPartitionColumn = partitionColumns.contains(
-                CatalogUtil.buildFQName(scanNode.getTableName(), c.getSimpleName()));
-          } else {
-            isPartitionColumn = partitionColumns.contains(c.getSimpleName());
-          }
-          if (isPartitionColumn) {
-            EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(),
-                scanNode.getCanonicalName() + "." + c.getSimpleName());
-          } else {
-            return null;
-          }
-        } else {
-          return null;
-        }
-      } else {
-        Column outputColumn = outputSchema.getColumn(index);
-        EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), outputColumn.getQualifiedName());
-      }
-    }
-
-    return copy;
-  }
-
-  /**
-   * Find aggregation columns in filter eval and add having clause or add HavingNode.
-   * @param context
-   * @param plan
-   * @param block
-   * @param parentNode  If null, having is parent
-   * @param havingNode      If null, projection is parent
-   * @param groupByNode
-   * @return matched origin eval
-   * @throws PlanningException
-   */
-  private List<EvalNode> addHavingNode(FilterPushDownContext context, LogicalPlan plan,
-                                       LogicalPlan.QueryBlock block,
-                                       UnaryNode parentNode,
-                                       HavingNode havingNode,
-                                       GroupbyNode groupByNode) throws PlanningException {
-    // find aggregation column
-    Set<Column> groupingColumns = new HashSet<Column>(Arrays.asList(groupByNode.getGroupingColumns()));
-    Set<String> aggrFunctionOutColumns = new HashSet<String>();
-    for (Column column : groupByNode.getOutSchema().getColumns()) {
-      if (!groupingColumns.contains(column)) {
-        aggrFunctionOutColumns.add(column.getQualifiedName());
-      }
-    }
-
-    List<EvalNode> aggrEvalOrigins = new ArrayList<EvalNode>();
-    List<EvalNode> aggrEvals = new ArrayList<EvalNode>();
-
-    for (EvalNode eval : context.pushingDownFilters) {
-      EvalNode copy = null;
-      try {
-        copy = (EvalNode)eval.clone();
-      } catch (CloneNotSupportedException e) {
-      }
-      boolean isEvalAggrFunction = false;
-      for (Column evalColumn : EvalTreeUtil.findUniqueColumns(copy)) {
-        if (aggrFunctionOutColumns.contains(evalColumn.getSimpleName())) {
-          EvalTreeUtil.changeColumnRef(copy, evalColumn.getQualifiedName(), evalColumn.getSimpleName());
-          isEvalAggrFunction = true;
-          break;
-        }
-      }
-      if (isEvalAggrFunction) {
-        aggrEvals.add(copy);
-        aggrEvalOrigins.add(eval);
-      }
-    }
-
-    if (aggrEvals.isEmpty()) {
-      return aggrEvalOrigins;
-    }
-
-    // transform
-
-    HavingNode workingHavingNode;
-    if (havingNode != null) {
-      workingHavingNode = havingNode;
-      aggrEvals.add(havingNode.getQual());
-    } else {
-      workingHavingNode = plan.createNode(HavingNode.class);
-      block.registerNode(workingHavingNode);
-      parentNode.setChild(workingHavingNode);
-      workingHavingNode.setChild(groupByNode);
-    }
-
-    EvalNode qual = null;
-    if (aggrEvals.size() > 1) {
-      // merged into one eval tree
-      qual = AlgebraicUtil.createSingletonExprFromCNF(aggrEvals.toArray(new EvalNode[aggrEvals.size()]));
-    } else if (aggrEvals.size() == 1) {
-      // if the number of matched expr is one
-      qual = aggrEvals.get(0);
-    }
-
-    // If there is not matched node add SelectionNode and clear context.pushingDownFilters
-    if (qual != null) {
-      workingHavingNode.setQual(qual);
-    }
-
-    return aggrEvalOrigins;
-  }
-
-  @Override
-  public LogicalNode visitWindowAgg(FilterPushDownContext context, LogicalPlan plan,
-                                  LogicalPlan.QueryBlock block, WindowAggNode winAggNode,
-                                  Stack<LogicalNode> stack) throws PlanningException {
-    stack.push(winAggNode);
-    super.visitWindowAgg(context, plan, block, winAggNode, stack);
-    stack.pop();
-    return winAggNode;
-  }
-
-
-  @Override
-  public LogicalNode visitGroupBy(FilterPushDownContext context, LogicalPlan plan,
-                                  LogicalPlan.QueryBlock block, GroupbyNode groupbyNode,
-                                  Stack<LogicalNode> stack) throws PlanningException {
-    LogicalNode parentNode = stack.peek();
-    List<EvalNode> aggrEvals;
-    if (parentNode.getType() == NodeType.HAVING) {
-      aggrEvals = addHavingNode(context, plan, block, null, (HavingNode)parentNode, groupbyNode);
-    } else {
-      aggrEvals = addHavingNode(context, plan, block, (UnaryNode)parentNode, null, groupbyNode);
-    }
-
-    if (aggrEvals != null) {
-      // remove aggregation eval from conext
-      context.pushingDownFilters.removeAll(aggrEvals);
-    }
-
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
-    // transform
-    Map<EvalNode, EvalNode> tranformed =
-        findCanPushdownAndTransform(context, block, groupbyNode,groupbyNode.getChild(), notMatched, null, false, 0);
-
-    context.setFiltersTobePushed(tranformed.keySet());
-    LogicalNode current = super.visitGroupBy(context, plan, block, groupbyNode, stack);
-
-    context.setToOrigin(tranformed);
-    context.addFiltersTobePushed(notMatched);
-
-    return current;
-  }
-
-  @Override
-  public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
-                               LogicalPlan.QueryBlock block, ScanNode scanNode,
-                               Stack<LogicalNode> stack) throws PlanningException {
-    List<EvalNode> matched = Lists.newArrayList();
-
-    // find partition column and check matching
-    Set<String> partitionColumns = new HashSet<String>();
-    TableDesc table = scanNode.getTableDesc();
-    boolean hasQualifiedName = false;
-    if (table.hasPartition()) {
-      for (Column c: table.getPartitionMethod().getExpressionSchema().getColumns()) {
-        partitionColumns.add(c.getQualifiedName());
-        hasQualifiedName = c.hasQualifier();
-      }
-    }
-    Set<EvalNode> partitionEvals = new HashSet<EvalNode>();
-    for (EvalNode eval : context.pushingDownFilters) {
-      if (table.hasPartition()) {
-        Set<Column> columns = EvalTreeUtil.findUniqueColumns(eval);
-        if (columns.size() != 1) {
-          continue;
-        }
-        Column column = columns.iterator().next();
-
-        // If catalog runs with HCatalog, partition column is a qualified name
-        // Else partition column is a simple name
-        boolean isPartitionColumn = false;
-        if (hasQualifiedName) {
-          isPartitionColumn = partitionColumns.contains(CatalogUtil.buildFQName(table.getName(), column.getSimpleName()));
-        } else {
-          isPartitionColumn = partitionColumns.contains(column.getSimpleName());
-        }
-        if (isPartitionColumn) {
-          EvalNode copy;
-          try {
-            copy = (EvalNode) eval.clone();
-          } catch (CloneNotSupportedException e) {
-            throw new PlanningException(e);
-          }
-          EvalTreeUtil.changeColumnRef(copy, column.getQualifiedName(),
-              scanNode.getCanonicalName() + "." + column.getSimpleName());
-          matched.add(copy);
-          partitionEvals.add(eval);
-        }
-      }
-    }
-
-    context.pushingDownFilters.removeAll(partitionEvals);
-
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
-
-    // transform
-    Map<EvalNode, EvalNode> transformed =
-        findCanPushdownAndTransform(context, block, scanNode, null, notMatched, partitionColumns, true, 0);
-
-    for (EvalNode eval : transformed.keySet()) {
-      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {
-        matched.add(eval);
-      }
-    }
-
-    EvalNode qual = null;
-    if (matched.size() > 1) {
-      // merged into one eval tree
-      qual = AlgebraicUtil.createSingletonExprFromCNF(
-          matched.toArray(new EvalNode[matched.size()]));
-    } else if (matched.size() == 1) {
-      // if the number of matched expr is one
-      qual = matched.iterator().next();
-    }
-
-    if (qual != null) { // if a matched qual exists
-      scanNode.setQual(qual);
-    }
-
-    for (EvalNode matchedEval: matched) {
-      transformed.remove(matchedEval);
-    }
-
-    context.setToOrigin(transformed);
-    context.addFiltersTobePushed(notMatched);
-
-    return scanNode;
-  }
-
-  private void errorFilterPushDown(LogicalPlan plan, LogicalNode node,
-                                   FilterPushDownContext context) throws PlanningException {
-    String notMatchedNodeStr = "";
-    String prefix = "";
-    for (EvalNode notMatchedNode: context.pushingDownFilters) {
-      notMatchedNodeStr += prefix + notMatchedNode;
-      prefix = ", ";
-    }
-    throw new PlanningException("FilterPushDown failed cause some filters not matched: " + notMatchedNodeStr + "\n" +
-        "Error node: " + node.getPlanString() + "\n" +
-        plan.toString());
-  }
-}


Mime
View raw message