tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-316: Improve GreedyHeuristicJoinOrderAlgorithm to deal with non-commutative joins. (hyunsik)
Date Thu, 05 Dec 2013 15:03:40 GMT
TAJO-316: Improve GreedyHeuristicJoinOrderAlgorithm to deal with non-commutative joins. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/39fe4d76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/39fe4d76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/39fe4d76

Branch: refs/heads/master
Commit: 39fe4d765f84af5094387d3a93399c1a0a9fef10
Parents: 45de54b
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Fri Dec 6 00:03:25 2013 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Fri Dec 6 00:03:25 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../benchmark/tpch/customer.schema              |   2 +-
 .../java/org/apache/tajo/benchmark/TPCH.java    |  20 +-
 .../engine/planner/BasicLogicalPlanVisitor.java |   4 +-
 .../tajo/engine/planner/LogicalOptimizer.java   |  49 +-
 .../tajo/engine/planner/LogicalPlanner.java     |   4 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  22 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |  58 ++-
 .../planner/global/ExecutionBlockCursor.java    |  34 +-
 .../engine/planner/global/GlobalPlanner.java    | 509 +++++++++----------
 .../tajo/engine/planner/global/MasterPlan.java  |   1 +
 .../tajo/engine/planner/logical/LimitNode.java  |   2 +-
 .../planner/logical/TableSubQueryNode.java      |   2 +-
 .../planner/logical/join/FoundJoinOrder.java    |  15 +-
 .../join/GreedyHeuristicJoinOrderAlgorithm.java | 365 ++++++-------
 .../engine/planner/logical/join/JoinEdge.java   |  17 +-
 .../engine/planner/logical/join/JoinGraph.java  |  38 +-
 .../logical/join/JoinOrderAlgorithm.java        |  16 +-
 .../planner/rewrite/FilterPushDownRule.java     |   4 +-
 .../tajo/master/querymaster/Repartitioner.java  |   2 +-
 .../tajo/engine/query/TestInsertQuery.java      |   8 +-
 .../apache/tajo/engine/query/TestJoinQuery.java |  66 ++-
 .../tajo/engine/query/TestSelectQuery.java      |   2 +-
 .../tajo/engine/query/TestTableSubQuery.java    |  23 +
 .../tajo/master/TestExecutionBlockCursor.java   |   6 +-
 .../src/test/tpch/customer.tbl                  |   2 +
 26 files changed, 698 insertions(+), 576 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 96e444b..1ca5b42 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -62,6 +62,9 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-316: Improve GreedyHeuristicJoinOrderAlgorithm to deal with
+    non-commutative joins. (hyunsik)
+
     TAJO-371: Increase the default value of worker memory. (jihoon)
 
     TAJO-284: Add table partitioning entry to Catalog. (jaehwa)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/benchmark/tpch/customer.schema
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/benchmark/tpch/customer.schema b/tajo-core/tajo-core-backend/benchmark/tpch/customer.schema
index 192873a..9df7c38 100644
--- a/tajo-core/tajo-core-backend/benchmark/tpch/customer.schema
+++ b/tajo-core/tajo-core-backend/benchmark/tpch/customer.schema
@@ -1,5 +1,5 @@
 create table customer (
-	c_custkey long, /* primary key */
+	c_custkey long,
 	c_name string,
 	c_nationkey long,
 	c_phone string,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 291b83f..841b7cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.benchmark;
 
+import com.google.common.collect.Maps;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +32,7 @@ import org.apache.tajo.storage.CSVFile;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.Map;
 
 public class TPCH extends BenchmarkSet {
   private final Log LOG = LogFactory.getLog(TPCH.class);
@@ -45,10 +47,23 @@ public class TPCH extends BenchmarkSet {
   public static String PARTSUPP = "partsupp";
   public static String SUPPLIER = "supplier";
 
+  public static final Map<String, Long> tableVolumes = Maps.newHashMap();
+
+  static {
+    tableVolumes.put(LINEITEM, 759863287L);
+    tableVolumes.put(CUSTOMER, 24346144L);
+    tableVolumes.put(NATION, 2224L);
+    tableVolumes.put(PART, 24135125L);
+    tableVolumes.put(REGION, 389L);
+    tableVolumes.put(ORDERS, 171952161L);
+    tableVolumes.put(PARTSUPP, 118984616L);
+    tableVolumes.put(SUPPLIER, 1409184L);
+  }
+
   @Override
   public void loadSchemas() {
     Schema lineitem = new Schema()
-        .addColumn("l_orderkey", Type.INT8) // 0
+        .addColumn("l_orderkey", Type.INT4) // 0
         .addColumn("l_partkey", Type.INT4) // 1
         .addColumn("l_suppkey", Type.INT4) // 2
         .addColumn("l_linenumber", Type.INT4) // 3
@@ -105,7 +120,7 @@ public class TPCH extends BenchmarkSet {
     schemas.put(REGION, region);
 
     Schema orders = new Schema()
-        .addColumn("o_orderkey", Type.INT8) // 0
+        .addColumn("o_orderkey", Type.INT4) // 0
         .addColumn("o_custkey", Type.INT4) // 1
         .addColumn("o_orderstatus", Type.TEXT) // 2
         .addColumn("o_totalprice", Type.FLOAT4) // 3
@@ -167,6 +182,7 @@ public class TPCH extends BenchmarkSet {
   private void loadTable(String tableName) throws ServiceException {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
     meta.putOption(CSVFile.DELIMITER, "|");
+
     try {
       tajo.createExternalTable(tableName, getSchema(tableName), new Path(dataDir, tableName), meta);
     } catch (SQLException s) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 22e5f9e..78c7c22 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -40,12 +40,12 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
   }
 
   public CONTEXT visit(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block) throws PlanningException {
-    RESULT result = visitChild(context, plan, block.getRoot(), new Stack<LogicalNode>());
+    visitChild(context, plan, block.getRoot(), new Stack<LogicalNode>());
     return context;
   }
 
   public CONTEXT visit(CONTEXT context, LogicalPlan plan, LogicalNode node) throws PlanningException {
-    RESULT result = visitChild(context, plan, node, new Stack<LogicalNode>());
+    visitChild(context, plan, node, new Stack<LogicalNode>());
     return context;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index 943ca6f..640383e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
@@ -33,7 +34,6 @@ import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
 import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 
-import java.util.Collection;
 import java.util.Set;
 import java.util.Stack;
 
@@ -83,7 +83,7 @@ public class LogicalOptimizer {
 
       // finding join order and restore remain filter order
       FoundJoinOrder order = joinOrderAlgorithm.findBestOrder(plan, block,
-          joinGraphContext.joinGraph, joinGraphContext.quals, joinGraphContext.relationsWithoutQual);
+          joinGraphContext.joinGraph, joinGraphContext.relationsForProduct);
       block.setJoinNode(order.getOrderedJoin());
 
       String optimizedOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
@@ -94,9 +94,18 @@ public class LogicalOptimizer {
   }
 
   private static class JoinGraphContext {
+    LogicalPlan.QueryBlock block;
     JoinGraph joinGraph = new JoinGraph();
     Set<EvalNode> quals = Sets.newHashSet();
-    Set<String> relationsWithoutQual = Sets.newHashSet();
+    Set<String> relationsForProduct = Sets.newHashSet();
+
+    public JoinGraphContext(LogicalPlan.QueryBlock block) {
+      this.block = block;
+    }
+
+    public LogicalPlan.QueryBlock getBlock() {
+      return block;
+    }
   }
 
   private static class JoinGraphBuilder extends BasicLogicalPlanVisitor<JoinGraphContext, LogicalNode> {
@@ -106,9 +115,14 @@ public class LogicalOptimizer {
       instance = new JoinGraphBuilder();
     }
 
+    /**
+     * This is based on the assumtion that all join and filter conditions are placed on the right join and
+     * scan operators. In other words, filter push down must be performed before this method.
+     * Otherwise, this method may build incorrectly a join graph.
+     */
     public static JoinGraphContext buildJoinGraph(LogicalPlan plan, LogicalPlan.QueryBlock block)
         throws PlanningException {
-      JoinGraphContext joinGraphContext = new JoinGraphContext();
+      JoinGraphContext joinGraphContext = new JoinGraphContext(block);
       instance.visit(joinGraphContext, plan, block);
       return joinGraphContext;
     }
@@ -126,14 +140,17 @@ public class LogicalOptimizer {
         throws PlanningException {
       super.visitJoin(joinGraphContext, plan, joinNode, stack);
       if (joinNode.hasJoinQual()) {
-        Collection<EvalNode> nonJoinQual =
-            joinGraphContext.joinGraph.addJoin(joinNode.getJoinType(), joinNode.getJoinQual());
-        joinGraphContext.quals.addAll(nonJoinQual);
+        joinGraphContext.joinGraph.addJoin(plan, joinGraphContext.block, joinNode);
       } else {
         LogicalNode leftChild = joinNode.getLeftChild();
+        LogicalNode rightChild = joinNode.getRightChild();
         if (leftChild instanceof RelationNode) {
           RelationNode rel = (RelationNode) leftChild;
-          joinGraphContext.relationsWithoutQual.add(rel.getCanonicalName());
+          joinGraphContext.relationsForProduct.add(rel.getCanonicalName());
+        }
+        if (rightChild instanceof RelationNode) {
+          RelationNode rel = (RelationNode) rightChild;
+          joinGraphContext.relationsForProduct.add(rel.getCanonicalName());
         }
       }
       return joinNode;
@@ -163,13 +180,27 @@ public class LogicalOptimizer {
       stack.push(joinNode);
       sb.append("(");
       visitChild(sb, plan, joinNode.getLeftChild(), stack);
-      sb.append(",");
+      sb.append(" ").append(getJoinNotation(joinNode.getJoinType())).append(" ");
       visitChild(sb, plan, joinNode.getRightChild(), stack);
       sb.append(")");
       stack.pop();
       return joinNode;
     }
 
+    private static String getJoinNotation(JoinType joinType) {
+      switch (joinType) {
+      case CROSS: return "⋈";
+      case INNER: return "⋈θ";
+      case LEFT_OUTER: return "⟕";
+      case RIGHT_OUTER: return "⟖";
+      case FULL_OUTER: return "⟗";
+      case LEFT_SEMI: return "⋉";
+      case RIGHT_SEMI: return "⋊";
+      case LEFT_ANTI: return "▷";
+      }
+      return ",";
+    }
+
     @Override
     public LogicalNode visitTableSubQuery(StringBuilder sb, LogicalPlan plan, TableSubQueryNode node,
                                           Stack<LogicalNode> stack) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index b5f8d2f..30d3e30 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -170,7 +170,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // 3. build scan plan
     Relation relation = expr;
     TableDesc desc = catalog.getTableDesc(relation.getName());
-    updatePhysicalInfo(desc);
+    if (!desc.hasStats()) {
+      updatePhysicalInfo(desc);
+    }
 
     ScanNode scanNode;
     if (relation.hasAlias()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index db58e32..7bc9455 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
@@ -45,7 +44,6 @@ import org.apache.tajo.util.IndexUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
@@ -293,8 +291,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long leftSize = estimateSizeRecursive(context, leftLineage);
     long rightSize = estimateSizeRecursive(context, rightLineage);
 
@@ -366,7 +364,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                    PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long rightTableVolume = estimateSizeRecursive(context, rightLineage);
 
     if (rightTableVolume < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
@@ -385,7 +383,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
     //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
     // blocking, but merge join is blocking as well)
-    String [] outerLineage4 = PlannerUtil.getLineage(plan.getLeftChild());
+    String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
     long outerSize = estimateSizeRecursive(context, outerLineage4);
     if (outerSize < conf.getLongVar(TajoConf.ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
       LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
@@ -456,8 +454,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                             PhysicalExec leftExec, PhysicalExec rightExec)
       throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long outerSize2 = estimateSizeRecursive(context, leftLineage);
     long innerSize2 = estimateSizeRecursive(context, rightLineage);
 
@@ -493,8 +491,8 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
                                                    PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
-    String [] leftLineage = PlannerUtil.getLineage(plan.getLeftChild());
-    String [] rightLineage = PlannerUtil.getLineage(plan.getRightChild());
+    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
     long outerSize2 = estimateSizeRecursive(context, leftLineage);
     long innerSize2 = estimateSizeRecursive(context, rightLineage);
     final long threshold = 1048576 * 128;
@@ -705,7 +703,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       return createInMemoryHashAggregation(context, groupbyNode, subOp);
     }
 
-    String [] outerLineage = PlannerUtil.getLineage(groupbyNode.getChild());
+    String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
     final long threshold = conf.getLongVar(TajoConf.ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
 
@@ -737,7 +735,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
 
   public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
                                      PhysicalExec child) throws IOException {
-    String [] outerLineage = PlannerUtil.getLineage(sortNode.getChild());
+    String [] outerLineage = PlannerUtil.getRelationLineage(sortNode.getChild());
     long estimatedSize = estimateSizeRecursive(context, outerLineage);
     final long threshold = 1048576 * 2000;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 9371463..28f5725 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -22,8 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.ObjectArrays;
 import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -36,8 +35,6 @@ import org.apache.tajo.storage.TupleComparator;
 import java.util.*;
 
 public class PlannerUtil {
-  private static final Log LOG = LogFactory.getLog(PlannerUtil.class);
-
   public static String normalizeTableName(String tableName) {
     return tableName.toLowerCase();
   }
@@ -50,8 +47,14 @@ public class PlannerUtil {
 
     return baseNode.getType() == NodeType.CREATE_TABLE || baseNode.getType() == NodeType.DROP_TABLE;
   }
-  
-  public static String [] getLineage(LogicalNode node) {
+
+  /**
+   * Get all scan nodes from a logical operator tree.
+   *
+   * @param node a start node
+   * @return an array of relation names
+   */
+  public static String [] getRelationLineage(LogicalNode node) {
     LogicalNode [] scans =  PlannerUtil.findAllNodes(node, NodeType.SCAN);
     String [] tableNames = new String[scans.length];
     ScanNode scan;
@@ -61,6 +64,41 @@ public class PlannerUtil {
     }
     return tableNames;
   }
+
+  /**
+   * Get all scan nodes from a logical operator tree within a query block
+   *
+   * @param node a start node
+   * @return an array of relation names
+   */
+  public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode node)
+      throws PlanningException {
+    RelationFinderVisitor visitor = new RelationFinderVisitor();
+    visitor.visit(null, plan, node);
+    return visitor.getFoundRelations();
+  }
+
+  public static class RelationFinderVisitor extends BasicLogicalPlanVisitor<Object, LogicalNode> {
+    private Set<String> foundRelNameSet = Sets.newHashSet();
+
+    public Set<String> getFoundRelations() {
+      return foundRelNameSet;
+    }
+
+    @Override
+    public LogicalNode visitChild(Object context, LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack)
+        throws PlanningException {
+      if (node.getType() != NodeType.TABLE_SUBQUERY) {
+        super.visitChild(context, plan, node, stack);
+      }
+
+      if (node instanceof RelationNode) {
+        foundRelNameSet.add(((RelationNode) node).getCanonicalName());
+      }
+
+      return node;
+    }
+  }
   
   /**
    * Delete the logical node from a plan.
@@ -322,8 +360,8 @@ public class PlannerUtil {
         return false;
       }
 
-      String [] outer = getLineage(joinNode.getLeftChild());
-      String [] inner = getLineage(joinNode.getRightChild());
+      String [] outer = getRelationLineage(joinNode.getLeftChild());
+      String [] inner = getRelationLineage(joinNode.getRightChild());
 
       Set<String> o = Sets.newHashSet(outer);
       Set<String> i = Sets.newHashSet(inner);
@@ -646,4 +684,8 @@ public class PlannerUtil {
       throw new RuntimeException(e);
     }
   }
+
+  public static boolean isCommutativeJoin(JoinType joinType) {
+    return joinType == JoinType.INNER;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
index 88fd68a..d4ab068 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlockCursor.java
@@ -15,6 +15,7 @@
 package org.apache.tajo.engine.planner.global;
 
 import java.util.ArrayList;
+import java.util.Stack;
 
 /**
  * A distributed execution plan (DEP) is a direct acyclic graph (DAG) of ExecutionBlocks.
@@ -35,16 +36,20 @@ public class ExecutionBlockCursor {
     return orderedBlocks.size();
   }
 
+  // Add all execution blocks in a depth first and postfix order
   private void buildOrder(ExecutionBlock current) {
+    Stack<ExecutionBlock> stack = new Stack<ExecutionBlock>();
     if (!masterPlan.isLeaf(current.getId())) {
-      if (masterPlan.getChildCount(current.getId()) == 1) {
-        ExecutionBlock block = masterPlan.getChild(current, 0);
-        buildOrder(block);
-      } else {
-        for (ExecutionBlock exec : masterPlan.getChilds(current)) {
-          buildOrder(exec);
+      for (ExecutionBlock execBlock : masterPlan.getChilds(current)) {
+        if (!masterPlan.isLeaf(execBlock)) {
+          buildOrder(execBlock);
+        } else {
+          stack.push(execBlock);
         }
       }
+      for (ExecutionBlock execBlock : stack) {
+        buildOrder(execBlock);
+      }
     }
     orderedBlocks.add(current);
   }
@@ -68,4 +73,21 @@ public class ExecutionBlockCursor {
   public void reset() {
     cursor = 0;
   }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < orderedBlocks.size(); i++) {
+      if (i == (cursor == 0 ? 0 : cursor - 1)) {
+        sb.append("(").append(orderedBlocks.get(i).getId().getId()).append(")");
+      } else {
+        sb.append(orderedBlocks.get(i).getId().getId());
+      }
+
+      if (i < orderedBlocks.size() - 1) {
+        sb.append(",");
+      }
+    }
+
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index f7540e7..f85b170 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -19,6 +19,8 @@
 package org.apache.tajo.engine.planner.global;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,81 +53,38 @@ public class GlobalPlanner {
 
   public class GlobalPlanContext {
     MasterPlan plan;
-    Set<String> broadcastTables = new HashSet<String>();
     LogicalNode topmost;
-    LogicalNode lastRepartionableNode;
-    ExecutionBlock topMostLeftExecBlock;
+
+    Map<Integer, ExecutionBlock> execBlockMap = Maps.newHashMap();
   }
 
   /**
    * Builds a master plan from the given logical plan.
    */
-  public void build(MasterPlan masterPlan)
-      throws IOException, PlanningException {
+  public void build(MasterPlan masterPlan) throws IOException, PlanningException {
 
     DistributedPlannerVisitor planner = new DistributedPlannerVisitor();
     GlobalPlanContext globalPlanContext = new GlobalPlanContext();
     globalPlanContext.plan = masterPlan;
     LOG.info(masterPlan.getLogicalPlan());
 
-    LogicalNode rootNode = PlannerUtil.clone(masterPlan.getLogicalPlan().getRootBlock().getRoot());
-    planner.visitChild(globalPlanContext, masterPlan.getLogicalPlan(), rootNode, new Stack<LogicalNode>());
-
-    ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
-
-    if (globalPlanContext.lastRepartionableNode != null
-        && globalPlanContext.lastRepartionableNode.getType() == NodeType.UNION) {
-      UnionNode unionNode = (UnionNode) globalPlanContext.lastRepartionableNode;
-      ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
-      UnionsFinderContext finderContext = new UnionsFinderContext();
-      finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
-
-      for (UnionNode union : finderContext.unionList) {
-        TableSubQueryNode leftSubQuery = union.getLeftChild();
-        TableSubQueryNode rightSubQuery = union.getRightChild();
-        if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
-          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-          execBlock.setPlan(leftSubQuery);
-          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
-          masterPlan.addConnect(dataChannel);
-        }
-        if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
-          ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-          execBlock.setPlan(rightSubQuery);
-          DataChannel dataChannel = new DataChannel(execBlock, terminalBlock, NONE_PARTITION, 1);
-          masterPlan.addConnect(dataChannel);
-        }
-      }
-    } else {
-      DataChannel dataChannel = new DataChannel(globalPlanContext.topMostLeftExecBlock, terminalBlock, NONE_PARTITION, 1);
-      dataChannel.setSchema(globalPlanContext.topmost.getOutSchema());
-      masterPlan.addConnect(dataChannel);
-    }
-    masterPlan.setTerminal(terminalBlock);
-    LOG.info(masterPlan);
-  }
+    LogicalNode inputPlan = PlannerUtil.clone(masterPlan.getLogicalPlan().getRootBlock().getRoot());
+    LogicalNode lastNode = planner.visitChild(globalPlanContext, masterPlan.getLogicalPlan(), inputPlan,
+        new Stack<LogicalNode>());
 
-  private ExecutionBlock buildRepartitionBlocks(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode curNode,
-                                                LogicalNode childNode, ExecutionBlock lastChildBlock)
-      throws PlanningException {
+    ExecutionBlock childExecBlock = globalPlanContext.execBlockMap.get(lastNode.getPID());
 
-    ExecutionBlock currentBlock = null;
-    ExecutionBlock childBlock;
-    childBlock = lastChildBlock;
-
-    NodeType shuffleRequiredNodeType = lastDistNode.getType();
-    if (shuffleRequiredNodeType == NodeType.GROUP_BY) {
-      ExecutionBlock [] blocks = buildGroupBy(masterPlan, lastDistNode, curNode, childNode, childBlock);
-      currentBlock = blocks[0];
-    } else if (shuffleRequiredNodeType == NodeType.SORT) {
-      ExecutionBlock [] blocks = buildSortPlan(masterPlan, lastDistNode, curNode, childNode, childBlock);
-      currentBlock = blocks[0];
-    } else if (shuffleRequiredNodeType == NodeType.JOIN) {
-      ExecutionBlock [] blocks = buildJoinPlan(masterPlan, lastDistNode, childBlock, lastChildBlock);
-      currentBlock = blocks[0];
+    if (childExecBlock.getPlan() != null) {
+      ExecutionBlock terminalBlock = masterPlan.createTerminalBlock();
+      DataChannel dataChannel = new DataChannel(childExecBlock, terminalBlock, NONE_PARTITION, 1);
+      dataChannel.setSchema(lastNode.getOutSchema());
+      masterPlan.addConnect(dataChannel);
+      masterPlan.setTerminal(terminalBlock);
+    } else {
+      masterPlan.setTerminal(childExecBlock);
     }
 
-    return currentBlock;
+    LOG.info(masterPlan);
   }
 
   public static ScanNode buildInputExecutor(LogicalPlan plan, DataChannel channel) {
@@ -153,12 +112,12 @@ public class GlobalPlanner {
     return channel;
   }
 
-  private ExecutionBlock [] buildJoinPlan(MasterPlan masterPlan, LogicalNode lastDistNode,
-                                          ExecutionBlock childBlock, ExecutionBlock lastChildBlock)
+  private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
+                                          ExecutionBlock leftBlock, ExecutionBlock rightBlock)
       throws PlanningException {
+    MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
-    JoinNode joinNode = (JoinNode) lastDistNode;
     LogicalNode leftNode = joinNode.getLeftChild();
     LogicalNode rightNode = joinNode.getRightChild();
 
@@ -189,22 +148,14 @@ public class GlobalPlanner {
         if (rightBroadcasted) {
           currentBlock.addBroadcastTable(rightScan.getCanonicalName());
         }
-        return new ExecutionBlock[] { currentBlock, childBlock };
+
+        context.execBlockMap.remove(leftScan.getPID());
+        context.execBlockMap.remove(rightScan.getPID());
+        return currentBlock;
       }
     }
 
     // symmetric repartition join
-
-    ExecutionBlock leftBlock;
-    if (lastChildBlock == null) {
-      leftBlock = masterPlan.newExecutionBlock();
-      leftBlock.setPlan(leftNode);
-    } else {
-      leftBlock = lastChildBlock;
-    }
-    ExecutionBlock rightBlock = masterPlan.newExecutionBlock();
-    rightBlock.setPlan(rightNode);
-
     currentBlock = masterPlan.newExecutionBlock();
 
     DataChannel leftChannel = createDataChannelFromJoin(leftBlock, rightBlock, currentBlock, joinNode, true);
@@ -220,184 +171,127 @@ public class GlobalPlanner {
     masterPlan.addConnect(leftChannel);
     masterPlan.addConnect(rightChannel);
 
-    return new ExecutionBlock[] { currentBlock, childBlock };
-
+    return currentBlock;
   }
 
-  private ExecutionBlock [] buildGroupBy(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
-                                         LogicalNode childNode, ExecutionBlock childBlock) throws PlanningException {
-    ExecutionBlock currentBlock = null;
-    GroupbyNode groupByNode = (GroupbyNode) lastDistNode;
-
-    if (groupByNode.isDistinct()) {
-      if (childBlock == null) { // first repartition node
-        childBlock = masterPlan.newExecutionBlock();
-      }
-      childBlock.setPlan(groupByNode.getChild());
-      currentBlock = masterPlan.newExecutionBlock();
-
-      LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
-
-      for (Target target : groupByNode.getTargets()) {
-        List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
-        for (AggregationFunctionCallEval function : functions) {
-          if (function.isDistinct()) {
-            columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
-          }
+  private ExecutionBlock buildDistinctGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
+                                              GroupbyNode groupbyNode) {
+    // setup child block
+    LogicalNode topMostOfFirstPhase = groupbyNode.getChild();
+    childBlock.setPlan(topMostOfFirstPhase);
+
+    // setup current block
+    ExecutionBlock currentBlock = context.plan.newExecutionBlock();
+    LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
+    for (Target target : groupbyNode.getTargets()) {
+      List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
+      for (AggregationFunctionCallEval function : functions) {
+        if (function.isDistinct()) {
+          columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
         }
       }
+    }
 
-      Set<Column> existingColumns = Sets.newHashSet(groupByNode.getGroupingColumns());
-      columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
-      SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
-      currentBlock.getEnforcer().enforceSortAggregation(groupByNode.getPID(), sortSpecs);
-
-      DataChannel channel;
-      channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-      channel.setPartitionKey(groupByNode.getGroupingColumns());
-      channel.setSchema(groupByNode.getInSchema());
+    // Set sort aggregation enforcer to the second groupby node
+    Set<Column> existingColumns = Sets.newHashSet(groupbyNode.getGroupingColumns());
+    columnsForDistinct.removeAll(existingColumns); // remove existing grouping columns
+    SortSpec [] sortSpecs = PlannerUtil.columnsToSortSpec(columnsForDistinct);
+    currentBlock.getEnforcer().enforceSortAggregation(groupbyNode.getPID(), sortSpecs);
 
-      GroupbyNode secondGroupBy = groupByNode;
-      ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-      secondGroupBy.setChild(scanNode);
 
-      LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-      if (parent instanceof UnaryNode && parent != secondGroupBy) {
-        ((UnaryNode)parent).setChild(secondGroupBy);
-      }
+    // setup channel
+    DataChannel channel;
+    channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
+    channel.setPartitionKey(groupbyNode.getGroupingColumns());
+    channel.setSchema(topMostOfFirstPhase.getOutSchema());
 
-      masterPlan.addConnect(channel);
-      currentBlock.setPlan(currentNode);
-      
-    } else {
+    // setup current block with channel
+    ScanNode scanNode = buildInputExecutor(context.plan.getLogicalPlan(), channel);
+    groupbyNode.setChild(scanNode);
+    currentBlock.setPlan(groupbyNode);
+    context.plan.addConnect(channel);
 
-      GroupbyNode firstGroupBy = PlannerUtil.transformGroupbyTo2P(groupByNode);
-      firstGroupBy.setHavingCondition(null);
+    return currentBlock;
+  }
 
-      if (firstGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
-          ((TableSubQueryNode)firstGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+  private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
+                                      GroupbyNode groupbyNode)
+      throws PlanningException {
 
-        UnionNode unionNode = PlannerUtil.findTopNode(groupByNode, NodeType.UNION);
-        ConsecutiveUnionFinder finder = new ConsecutiveUnionFinder();
-        UnionsFinderContext finderContext = new UnionsFinderContext();
-        finder.visitChild(finderContext, masterPlan.getLogicalPlan(), unionNode, new Stack<LogicalNode>());
+    MasterPlan masterPlan = context.plan;
+    ExecutionBlock currentBlock;
 
-        currentBlock = masterPlan.newExecutionBlock();
-        GroupbyNode secondGroupBy = groupByNode;
-        for (UnionNode union : finderContext.unionList) {
-          TableSubQueryNode leftSubQuery = union.getLeftChild();
-          TableSubQueryNode rightSubQuery = union.getRightChild();
-          DataChannel dataChannel;
-          if (leftSubQuery.getSubQuery().getType() != NodeType.UNION) {
-            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-            g1.setChild(leftSubQuery);
-            execBlock.setPlan(g1);
-            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
-            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-            secondGroupBy.setChild(scanNode);
-            masterPlan.addConnect(dataChannel);
-          }
-          if (rightSubQuery.getSubQuery().getType() != NodeType.UNION) {
-            ExecutionBlock execBlock = masterPlan.newExecutionBlock();
-            GroupbyNode g1 = PlannerUtil.clone(firstGroupBy);
-            g1.setChild(rightSubQuery);
-            execBlock.setPlan(g1);
-            dataChannel = new DataChannel(execBlock, currentBlock, HASH_PARTITION, 32);
-
-            ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-            secondGroupBy.setChild(scanNode);
-            masterPlan.addConnect(dataChannel);
+    if (groupbyNode.isDistinct()) {
+      return buildDistinctGroupBy(context, childBlock, groupbyNode);
+    } else {
+      GroupbyNode firstPhaseGroupBy = PlannerUtil.transformGroupbyTo2P(groupbyNode);
+      firstPhaseGroupBy.setHavingCondition(null);
+
+      if (firstPhaseGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
+          ((TableSubQueryNode)firstPhaseGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+
+        currentBlock = childBlock;
+        for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
+          if (firstPhaseGroupBy.isEmptyGrouping()) {
+            dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 1);
+          } else {
+            dataChannel.setPartition(HASH_PARTITION, firstPhaseGroupBy.getGroupingColumns(), 32);
           }
-        }
-        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-        if (parent instanceof UnaryNode && parent != secondGroupBy) {
-          ((UnaryNode)parent).setChild(secondGroupBy);
-        }
-        currentBlock.setPlan(currentNode);
-      } else {
+          dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
 
-        if (childBlock == null) { // first repartition node
-          childBlock = masterPlan.newExecutionBlock();
-        }
-        childBlock.setPlan(firstGroupBy);
+          ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+          GroupbyNode g1 = PlannerUtil.clone(firstPhaseGroupBy);
+          g1.setChild(subBlock.getPlan());
+          subBlock.setPlan(g1);
 
+          GroupbyNode g2 = PlannerUtil.clone(groupbyNode);
+          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
+          g2.setChild(scanNode);
+          currentBlock.setPlan(g2);
+        }
+      } else { // general hash-shuffled aggregation
+        childBlock.setPlan(firstPhaseGroupBy);
         currentBlock = masterPlan.newExecutionBlock();
 
         DataChannel channel;
-        if (firstGroupBy.isEmptyGrouping()) {
+        if (firstPhaseGroupBy.isEmptyGrouping()) {
           channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 1);
-          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         } else {
           channel = new DataChannel(childBlock, currentBlock, HASH_PARTITION, 32);
-          channel.setPartitionKey(firstGroupBy.getGroupingColumns());
+          channel.setPartitionKey(firstPhaseGroupBy.getGroupingColumns());
         }
-        channel.setSchema(firstGroupBy.getOutSchema());
+        channel.setSchema(firstPhaseGroupBy.getOutSchema());
 
-        GroupbyNode secondGroupBy = groupByNode;
         ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-        secondGroupBy.setChild(scanNode);
-
-        LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-        if (parent instanceof UnaryNode && parent != secondGroupBy) {
-          ((UnaryNode)parent).setChild(secondGroupBy);
-        }
-
+        groupbyNode.setChild(scanNode);
+        groupbyNode.setInSchema(scanNode.getOutSchema());
+        currentBlock.setPlan(groupbyNode);
         masterPlan.addConnect(channel);
-        currentBlock.setPlan(currentNode);
       }
     }
 
-    return new ExecutionBlock [] {currentBlock, childBlock};
+    return currentBlock;
   }
 
-  private ExecutionBlock [] buildSortPlan(MasterPlan masterPlan, LogicalNode lastDistNode, LogicalNode currentNode,
-                                          LogicalNode childNode, ExecutionBlock childBlock) {
-    ExecutionBlock currentBlock = null;
+  private ExecutionBlock buildSortPlan(MasterPlan masterPlan, ExecutionBlock childBlock, SortNode currentNode) {
+    ExecutionBlock currentBlock;
 
-    SortNode firstSort = (SortNode) lastDistNode;
-    if (childBlock == null) {
-      childBlock = masterPlan.newExecutionBlock();
-    }
-    childBlock.setPlan(firstSort);
+    SortNode firstSortNode = PlannerUtil.clone(currentNode);
+    childBlock.setPlan(firstSortNode);
 
     currentBlock = masterPlan.newExecutionBlock();
     DataChannel channel = new DataChannel(childBlock, currentBlock, RANGE_PARTITION, 32);
-    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(firstSort.getSortKeys()).toArray());
-    channel.setSchema(childNode.getOutSchema());
+    channel.setPartitionKey(PlannerUtil.sortSpecsToSchema(currentNode.getSortKeys()).toArray());
+    channel.setSchema(currentNode.getOutSchema());
 
-    SortNode secondSort = PlannerUtil.clone(lastDistNode);
     ScanNode secondScan = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-    secondSort.setChild(secondScan);
-
-    LimitNode limitAndSort;
-    LimitNode limitOrNull = PlannerUtil.findTopNode(currentNode, NodeType.LIMIT);
-    if (limitOrNull != null) {
-      limitAndSort = PlannerUtil.clone(limitOrNull);
-      limitAndSort.setChild(firstSort);
-
-      if (childBlock.getPlan().getType() == NodeType.SORT) {
-        childBlock.setPlan(limitAndSort);
-      } else {
-        LogicalNode sortParent = PlannerUtil.findTopParentNode(childBlock.getPlan(), NodeType.SORT);
-        if (sortParent != null) {
-          if (sortParent instanceof UnaryNode) {
-            ((UnaryNode)sortParent).setChild(limitAndSort);
-          }
-        }
-      }
-    }
-
-    LogicalNode parent = PlannerUtil.findTopParentNode(currentNode, lastDistNode.getType());
-    if (parent instanceof UnaryNode && parent != secondSort) {
-      ((UnaryNode)parent).setChild(secondSort);
-    }
-
-    masterPlan.addConnect(channel);
+    currentNode.setChild(secondScan);
+    currentNode.setInSchema(secondScan.getOutSchema());
     currentBlock.setPlan(currentNode);
+    masterPlan.addConnect(channel);
 
-    return new ExecutionBlock[] { currentBlock, childBlock };
+    return currentBlock;
   }
 
   public class DistributedPlannerVisitor extends BasicLogicalPlanVisitor<GlobalPlanContext, LogicalNode> {
@@ -405,35 +299,54 @@ public class GlobalPlanner {
     @Override
     public LogicalNode visitRoot(GlobalPlanContext context, LogicalPlan plan, LogicalRootNode node,
                                  Stack<LogicalNode> stack) throws PlanningException {
-      super.visitRoot(context, plan, node, stack);
-
-      if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost, context.topMostLeftExecBlock);
-      } else if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() == NodeType.UNION) {
-
-      } else {
-        ExecutionBlock execBlock = context.plan.newExecutionBlock();
-        execBlock.setPlan(node);
-        context.topMostLeftExecBlock = execBlock;
-      }
-
-      context.topmost = node;
-      return node;
+      LogicalNode child = super.visitRoot(context, plan, node, stack);
+      return child;
     }
 
     @Override
     public LogicalNode visitProjection(GlobalPlanContext context, LogicalPlan plan, ProjectionNode node,
                                        Stack<LogicalNode> stack) throws PlanningException {
-      super.visitProjection(context, plan, node, stack);
-      context.topmost = node;
-      return node;
+      LogicalNode child = super.visitProjection(context, plan, node, stack);
+
+      return handleUnaryNode(context, child, node);
     }
 
     @Override
     public LogicalNode visitLimit(GlobalPlanContext context, LogicalPlan plan, LimitNode node, Stack<LogicalNode> stack)
         throws PlanningException {
-      super.visitLimit(context, plan, node, stack);
-      context.topmost = node;
+      LogicalNode child = super.visitLimit(context, plan, node, stack);
+
+      ExecutionBlock block = null;
+      block = context.execBlockMap.remove(child.getPID());
+      if (child.getType() == NodeType.SORT) {
+        node.setChild(block.getPlan());
+        block.setPlan(node);
+
+        ExecutionBlock childBlock = context.plan.getChild(block, 0);
+        LimitNode childLimit = PlannerUtil.clone(node);
+        childLimit.setChild(childBlock.getPlan());
+        childBlock.setPlan(childLimit);
+
+        DataChannel channel = context.plan.getChannel(childBlock, block);
+        channel.setPartitionNum(1);
+        context.execBlockMap.put(node.getPID(), block);
+      } else {
+        node.setChild(block.getPlan());
+        block.setPlan(node);
+
+        ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
+        DataChannel newChannel = new DataChannel(block, newExecBlock, HASH_PARTITION, 1);
+        newChannel.setPartitionKey(new Column[]{});
+        newChannel.setSchema(node.getOutSchema());
+        ScanNode scanNode = buildInputExecutor(plan, newChannel);
+        LimitNode parentLimit = PlannerUtil.clone(node);
+        parentLimit.setChild(scanNode);
+        newExecBlock.setPlan(parentLimit);
+        context.plan.addConnect(newChannel);
+        context.execBlockMap.put(node.getPID(), newExecBlock);
+      }
+
+
       return node;
     }
 
@@ -441,15 +354,11 @@ public class GlobalPlanner {
     public LogicalNode visitSort(GlobalPlanContext context, LogicalPlan plan, SortNode node, Stack<LogicalNode> stack)
         throws PlanningException {
 
-      super.visitSort(context, plan, node, stack);
+      LogicalNode child = super.visitSort(context, plan, node, stack);
 
-      if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
-            context.topMostLeftExecBlock);
-      }
-
-      context.topmost = node;
-      context.lastRepartionableNode = node;
+      ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+      ExecutionBlock newExecBlock = buildSortPlan(context.plan, childBlock, node);
+      context.execBlockMap.put(node.getPID(), newExecBlock);
 
       return node;
     }
@@ -457,38 +366,38 @@ public class GlobalPlanner {
     @Override
     public LogicalNode visitGroupBy(GlobalPlanContext context, LogicalPlan plan, GroupbyNode node,
                                     Stack<LogicalNode> stack) throws PlanningException {
-      super.visitGroupBy(context, plan, node, stack);
+      LogicalNode child = super.visitGroupBy(context, plan, node, stack);
 
-      if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
-            context.topmost, context.topMostLeftExecBlock);
-      }
+      ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+      ExecutionBlock newExecBlock = buildGroupBy(context, childBlock, node);
+      context.execBlockMap.put(node.getPID(), newExecBlock);
 
-      context.topmost = node;
-      context.lastRepartionableNode = node;
       return node;
     }
 
     @Override
     public LogicalNode visitFilter(GlobalPlanContext context, LogicalPlan plan, SelectionNode node,
                                    Stack<LogicalNode> stack) throws PlanningException {
-      super.visitFilter(context, plan, node, stack);
-      context.topmost = node;
+      LogicalNode child = super.visitFilter(context, plan, node, stack);
+
+      ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+      childBlock.setPlan(child);
+      context.execBlockMap.put(node.getPID(), childBlock);
+
       return node;
     }
 
     @Override
     public LogicalNode visitJoin(GlobalPlanContext context, LogicalPlan plan, JoinNode node, Stack<LogicalNode> stack)
         throws PlanningException {
-      super.visitJoin(context, plan, node, stack);
+      LogicalNode leftChild = visitChild(context, plan, node.getLeftChild(), stack);
+      LogicalNode rightChild = visitChild(context, plan, node.getRightChild(), stack);
 
-      if (context.lastRepartionableNode != null) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node, context.topmost,
-            context.topMostLeftExecBlock);
-      }
+      ExecutionBlock leftChildBlock = context.execBlockMap.get(leftChild.getPID());
+      ExecutionBlock rightChildBlock = context.execBlockMap.get(rightChild.getPID());
 
-      context.topmost = node;
-      context.lastRepartionableNode = node;
+      ExecutionBlock newExecBlock = buildJoinPlan(context, node, leftChildBlock, rightChildBlock);
+      context.execBlockMap.put(node.getPID(), newExecBlock);
 
       return node;
     }
@@ -496,54 +405,105 @@ public class GlobalPlanner {
     @Override
     public LogicalNode visitUnion(GlobalPlanContext context, LogicalPlan plan, UnionNode node,
                                   Stack<LogicalNode> stack) throws PlanningException {
-      super.visitUnion(context, plan, node, stack);
+      stack.push(node);
+      LogicalNode leftChild = visitChild(context, plan, node.getLeftChild(), stack);
+      LogicalNode rightChild = visitChild(context, plan, node.getRightChild(), stack);
+      stack.pop();
 
-      if (context.lastRepartionableNode != null && context.lastRepartionableNode.getType() != NodeType.UNION) {
-        context.topMostLeftExecBlock = buildRepartitionBlocks(context.plan, context.lastRepartionableNode, node,
-            context.topmost, context.topMostLeftExecBlock);
+      List<ExecutionBlock> unionBlocks = Lists.newArrayList();
+      List<ExecutionBlock> queryBlockBlocks = Lists.newArrayList();
+
+      ExecutionBlock leftBlock = context.execBlockMap.remove(leftChild.getPID());
+      ExecutionBlock rightBlock = context.execBlockMap.remove(rightChild.getPID());
+      if (leftChild.getType() == NodeType.UNION) {
+        unionBlocks.add(leftBlock);
+      } else {
+        queryBlockBlocks.add(leftBlock);
+      }
+      if (rightChild.getType() == NodeType.UNION) {
+        unionBlocks.add(rightBlock);
+      } else {
+        queryBlockBlocks.add(rightBlock);
       }
 
-      context.topmost = node;
-      context.lastRepartionableNode = node;
+      ExecutionBlock execBlock;
+      if (unionBlocks.size() == 0) {
+        execBlock = context.plan.newExecutionBlock();
+      } else {
+        execBlock = unionBlocks.get(0);
+      }
+
+      for (ExecutionBlock childBlocks : unionBlocks) {
+        UnionNode union = (UnionNode) childBlocks.getPlan();
+        queryBlockBlocks.add(context.execBlockMap.get(union.getLeftChild().getPID()));
+        queryBlockBlocks.add(context.execBlockMap.get(union.getRightChild().getPID()));
+      }
+
+      for (ExecutionBlock childBlocks : queryBlockBlocks) {
+        DataChannel channel = new DataChannel(childBlocks, execBlock, NONE_PARTITION, 1);
+        context.plan.addConnect(channel);
+      }
+
+      context.execBlockMap.put(node.getPID(), execBlock);
+
+      return node;
+    }
+
+    private LogicalNode handleUnaryNode(GlobalPlanContext context, LogicalNode child, LogicalNode node) {
+      ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
+      execBlock.setPlan(node);
+      context.execBlockMap.put(node.getPID(), execBlock);
+
       return node;
     }
 
     @Override
     public LogicalNode visitExcept(GlobalPlanContext context, LogicalPlan plan, ExceptNode node,
                                    Stack<LogicalNode> stack) throws PlanningException {
-      super.visitExcept(context, plan, node, stack);
+      LogicalNode child = super.visitExcept(context, plan, node, stack);
       context.topmost = node;
-      return node;
+
+      return handleUnaryNode(context, child, node);
     }
 
     @Override
     public LogicalNode visitIntersect(GlobalPlanContext context, LogicalPlan plan, IntersectNode node,
                                       Stack<LogicalNode> stack) throws PlanningException {
-      super.visitIntersect(context, plan, node, stack);
+      LogicalNode child = super.visitIntersect(context, plan, node, stack);
       context.topmost = node;
-      return node;
+
+      return handleUnaryNode(context, child, node);
     }
 
     @Override
     public LogicalNode visitTableSubQuery(GlobalPlanContext context, LogicalPlan plan, TableSubQueryNode node,
                                           Stack<LogicalNode> stack) throws PlanningException {
-      super.visitTableSubQuery(context, plan, node, stack);
-      context.topmost = node;
-      return node;
+      LogicalNode child = super.visitTableSubQuery(context, plan, node, stack);
+
+      return handleUnaryNode(context, child, node);
     }
 
     @Override
     public LogicalNode visitScan(GlobalPlanContext context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
         throws PlanningException {
       context.topmost = node;
+
+      ExecutionBlock newExecBlock = context.plan.newExecutionBlock();
+      newExecBlock.setPlan(node);
+      context.execBlockMap.put(node.getPID(), newExecBlock);
       return node;
     }
 
     @Override
     public LogicalNode visitStoreTable(GlobalPlanContext context, LogicalPlan plan, StoreTableNode node,
                                        Stack<LogicalNode> stack) throws PlanningException {
-      super.visitStoreTable(context, plan, node, stack);
+      LogicalNode child = super.visitStoreTable(context, plan, node, stack);
       context.topmost = node;
+
+      ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
+      execBlock.setPlan(node);
+      context.execBlockMap.put(node.getPID(), execBlock);
+
       return node;
     }
 
@@ -551,8 +511,13 @@ public class GlobalPlanner {
     public LogicalNode visitInsert(GlobalPlanContext context, LogicalPlan plan, InsertNode node,
                                    Stack<LogicalNode> stack)
         throws PlanningException {
-      super.visitInsert(context, plan, node, stack);
+      LogicalNode child = super.visitInsert(context, plan, node, stack);
       context.topmost = node;
+
+      ExecutionBlock execBlock = context.execBlockMap.remove(child.getPID());
+      execBlock.setPlan(node);
+      context.execBlockMap.put(node.getPID(), execBlock);
+
       return node;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
index 6aaeb1d..891b452 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/MasterPlan.java
@@ -83,6 +83,7 @@ public class MasterPlan {
   
   public void setTerminal(ExecutionBlock root) {
     this.root = root;
+    this.terminalBlock = root;
   }
   
   public ExecutionBlock getRoot() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
index a246c0e..b604fac 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/LimitNode.java
@@ -35,7 +35,7 @@ public final class LimitNode extends UnaryNode implements Cloneable {
 
   @Override
   public PlanString getPlanString() {
-    return new PlanString("Limit");
+    return new PlanString("Limit " + fetchFirstNum);
   }
   
   @Override 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index 846ed41..d1f0986 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -119,6 +119,6 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
   }
 
   public String toString() {
-    return "Table Subquery (alias = " + tableName + ")\n" + subQuery.toString();
+    return "(" + getPID() + ") Table Subquery (alias = " + tableName + ")\n" + subQuery.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
index a50d44f..5ae34f7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/FoundJoinOrder.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.engine.planner.logical.join;
 
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 
 /**
@@ -28,12 +27,10 @@ import org.apache.tajo.engine.planner.logical.JoinNode;
 @InterfaceStability.Evolving
 public class FoundJoinOrder {
   private JoinNode joinNode;
-  private EvalNode[] quals;
   private double cost;
 
-  public FoundJoinOrder(JoinNode joinNode, EvalNode[] quals, double cost) {
+  public FoundJoinOrder(JoinNode joinNode, double cost) {
     this.joinNode = joinNode;
-    this.quals = quals;
     this.cost = cost;
   }
 
@@ -47,14 +44,4 @@ public class FoundJoinOrder {
   public double getCost() {
     return cost;
   }
-
-  /**
-   * The search conditions contained in where clause are pushed down into join operators.
-   * This method returns the remain search conditions except for pushed down condition.
-   *
-   * @return the remain search conditions
-   */
-  public EvalNode [] getRemainConditions() {
-    return this.quals;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
index d80393b..5ac8b57 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -18,25 +18,21 @@
 
 package org.apache.tajo.engine.planner.logical.join;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.logical.JoinNode;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.logical.RelationNode;
-import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.util.TUtil;
 
-import java.util.*;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
- * This is a greedy heuristic algorithm to find a left-deep join tree. This algorithm finds
- * the best join order with join conditions and pushes down join conditions to
+ * This is a greedy heuristic algorithm to find a bushy join tree. This algorithm finds
+ * the best join order with join conditions and pushed-down join conditions to
  * all join operators.
  */
 public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
@@ -44,246 +40,197 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
 
   @Override
   public FoundJoinOrder findBestOrder(LogicalPlan plan, LogicalPlan.QueryBlock block, JoinGraph joinGraph,
-                                         Set<EvalNode> qualSet, Set<String> relationsWithoutQual) {
+                                      Set<String> relationsWithoutQual) throws PlanningException {
 
-    // Build a map (a relation name -> a relation object)
-    // initialize a relation set
-    HashMap<String, RelationNode> relationMap = Maps.newHashMap();
-    // Remain relation set to be joined
-    Set<String> remainRelNames = new HashSet<String>();
+    // Setup a remain relation set to be joined
+    Set<LogicalNode> remainRelations = new HashSet<LogicalNode>();
     for (RelationNode relation : block.getRelations()) {
-      RelationNode rel = relation;
-      relationMap.put(rel.getCanonicalName(), rel);
-      remainRelNames.add(rel.getCanonicalName());
+      remainRelations.add(relation);
     }
-    // A set of already-joined relations
-    Set<String> alreadyJoinedRelNames = new HashSet<String>();
-
-    // Get candidates from all relations to be joined
-    List<RelationNode> candidates = getSmallerRelations(relationMap, remainRelNames);
-
-    LogicalNode lastOne = candidates.get(0); // Get the first candidate relation (i.e., smallest)
-    remainRelNames.remove(((RelationNode)lastOne).getCanonicalName()); // Remove the first candidate relation
-
-    // Add the first candidate to the set of joined relations
-    alreadyJoinedRelNames.add(((RelationNode)lastOne).getCanonicalName());
-
-    List<CandidateJoinNode> orderedJoins;
-    CandidateJoinNode chosen = null;
-    while(true) {
-      // Get a set of relations that can be joined to the composite relation.
-      Set<CandidateJoin> joinCandidates = new HashSet<CandidateJoin>();
-      for (String currentRel : alreadyJoinedRelNames) {
-        // find all relations that can be joined to this relation
-        Collection<JoinEdge> edges = joinGraph.getJoinsWith(currentRel);
-
-        if (edges.size() > 0) { // if there are available join quals
-          for (JoinEdge edge : edges) {
-            if (alreadyJoinedRelNames.contains(edge.getLeftRelation())
-                && currentRel.contains(edge.getRightRelation())) { // if two relations are already joined
-              continue;
-            }
-
-            if (!alreadyJoinedRelNames.contains(edge.getLeftRelation())) {
-              joinCandidates.add(new CandidateJoin(edge.getLeftRelation(), edge)) ;
-            }
-            if (!alreadyJoinedRelNames.contains(edge.getRightRelation())) {
-              joinCandidates.add(new CandidateJoin(edge.getRightRelation(), edge));
-            }
-          }
-        } else {
-          for (RelationNode rel : block.getRelations()) {
-            // Add all relations except for itself to join candidates
-            if (!currentRel.equals(rel.getCanonicalName())) {
-              joinCandidates.add(new CandidateJoin(rel.getCanonicalName(),
-                  new JoinEdge(JoinType.CROSS, currentRel, rel.getCanonicalName())));
-            }
-          }
-        }
-      }
 
-      // Get a ranked candidates from the set of relations that can be joined
-      orderedJoins = getBestJoinRelations(relationMap, lastOne, joinCandidates);
-
-      // Get a candidate relation such that the candidate incurs the smallest intermediate and is
-      // not join to any relation yet.
-
-      for (int i = 0; i < orderedJoins.size(); i++) {
-        chosen = orderedJoins.get(i);
-        if (remainRelNames.contains(chosen.getRelation().getCanonicalName())) {
-          break;
-        }
-      }
+    LogicalNode latestJoin;
+    JoinEdge bestPair;
 
-      // Set the candidate to a inner relation and remove from the relation set.
-      JoinNode lastJoinNode = new JoinNode(plan.newPID(), chosen.getJoinType());
-      lastJoinNode.setLeftChild(lastOne); // Set the first candidate to a left relation of the first join
-      lastJoinNode.setRightChild(chosen.getRelation());
+    while (remainRelations.size() > 1) {
+      // Find the best join pair among all joinable operators in candidate set.
+      bestPair = getBestPair(plan, joinGraph, remainRelations);
 
-      Schema merged = SchemaUtil.merge(lastJoinNode.getLeftChild().getOutSchema(),
-          lastJoinNode.getRightChild().getOutSchema());
-      lastJoinNode.setInSchema(merged);
-      lastJoinNode.setOutSchema(merged);
+      remainRelations.remove(bestPair.getLeftRelation()); // remainRels = remainRels \ Ti
+      remainRelations.remove(bestPair.getRightRelation()); // remainRels = remainRels \ Tj
 
-      if (chosen.hasJoinQual()) {
-        lastJoinNode.setJoinQual(EvalTreeUtil.transformCNF2Singleton(chosen.getJoinQual()));
-        for (EvalNode joinCondition : chosen.getJoinQual()) {
-          qualSet.remove(joinCondition);
-        }
-      }
-      lastJoinNode.setCost(getCost(chosen));
-      alreadyJoinedRelNames.add(chosen.getRelation().getCanonicalName());
-      remainRelNames.remove(chosen.getRelation().getCanonicalName());
-      lastOne = lastJoinNode;
-
-      // If the relation set is empty, stop this loop.
-      if (remainRelNames.isEmpty()) {
-        Preconditions.checkState(qualSet.isEmpty(), "Not all join conditions are pushed down to joins.");
-        break;
-      }
+      latestJoin = createJoinNode(plan, bestPair);
+      remainRelations.add(latestJoin);
     }
 
-    return new FoundJoinOrder((JoinNode) lastOne, qualSet.toArray(new EvalNode[qualSet.size()]), getCost(chosen));
+    JoinNode joinTree = (JoinNode) remainRelations.iterator().next();
+    return new FoundJoinOrder(joinTree, getCost(joinTree));
   }
 
-  private class CandidateJoin {
-    final JoinEdge edge;
-    final String relationName;
-
-    public CandidateJoin(String rightRelation, JoinEdge edge) {
-      this.relationName = rightRelation;
-      this.edge = edge;
-    }
-
-    public String getRelationName() {
-      return relationName;
-    }
+  private static JoinNode createJoinNode(LogicalPlan plan, JoinEdge joinEdge) {
+    LogicalNode left = joinEdge.getLeftRelation();
+    LogicalNode right = joinEdge.getRightRelation();
+    JoinNode joinNode;
 
-    public JoinEdge getEdge() {
-      return edge;
+    if (PlannerUtil.isCommutativeJoin(joinEdge.getJoinType())) {
+      // if only one operator is relation
+      if ((left instanceof RelationNode) && !(right instanceof RelationNode)) {
+        // for left deep
+        joinNode =  new JoinNode(plan.newPID(), joinEdge.getJoinType(), right, left);
+      } else {
+        // if both operators are relation or if both are relations
+        // we don't need to concern the left-right position.
+        joinNode = new JoinNode(plan.newPID(), joinEdge.getJoinType(), left, right);
+      }
+    } else {
+      joinNode = new JoinNode(plan.newPID(), joinEdge.getJoinType(), left, right);
     }
 
-    public String toString() {
-      return edge.toString();
+    Schema mergedSchema = SchemaUtil.merge(joinNode.getLeftChild().getOutSchema(),
+        joinNode.getRightChild().getOutSchema());
+    joinNode.setInSchema(mergedSchema);
+    joinNode.setOutSchema(mergedSchema);
+    if (joinEdge.hasJoinQual()) {
+      joinNode.setJoinQual(EvalTreeUtil.transformCNF2Singleton(joinEdge.getJoinQual()));
     }
+    return joinNode;
   }
 
-  private class CandidateJoinNode {
-    final LogicalNode relOrJoin;
-    final RelationNode rel;
-    final JoinEdge joinEdge;
-
-    public CandidateJoinNode(LogicalNode relOrJoin, RelationNode relation, JoinEdge joinEdge) {
-      this.relOrJoin = relOrJoin;
-      this.rel = relation;
-      this.joinEdge = joinEdge;
-    }
-
-    public LogicalNode getRelationOrJoin() {
-      return relOrJoin;
-    }
+  /**
+   * Find the best join pair among all joinable operators in candidate set.
+   *
+   * @param plan a logical plan
+   * @param graph a join graph which consists of vertices and edges, where vertex is relation and
+   *              each edge is join condition.
+   * @param candidateSet candidate operators to be joined.
+   * @return The best join pair among them
+   * @throws PlanningException
+   */
+  private JoinEdge getBestPair(LogicalPlan plan, JoinGraph graph, Set<LogicalNode> candidateSet)
+      throws PlanningException {
+    double minCost = Double.MAX_VALUE;
+    JoinEdge bestJoin = null;
+
+    double minNonCrossJoinCost = Double.MAX_VALUE;
+    JoinEdge bestNonCrossJoin = null;
+
+    for (LogicalNode outer : candidateSet) {
+      for (LogicalNode inner : candidateSet) {
+        if (outer.equals(inner)) {
+          continue;
+        }
 
-    public RelationNode getRelation() {
-      return rel;
-    }
+        JoinEdge foundJoin = findJoin(plan, graph, outer, inner);
+        if (foundJoin == null) {
+          continue;
+        }
+        double cost = getCost(foundJoin);
 
-    public JoinType getJoinType() {
-      return joinEdge.getJoinType();
-    }
+        if (cost < minCost) {
+          minCost = cost;
+          bestJoin = foundJoin;
+        }
 
-    public boolean hasJoinQual() {
-      return joinEdge.hasJoinQual() && joinEdge.getJoinQual().length > 0;
+        // Keep the min cost join
+        // But, if there exists a qualified join, the qualified join must be chosen
+        // rather than cross join regardless of cost.
+        if (foundJoin.hasJoinQual()) {
+          if (cost < minNonCrossJoinCost) {
+            minNonCrossJoinCost = cost;
+            bestNonCrossJoin = foundJoin;
+          }
+        }
+      }
     }
 
-    public EvalNode [] getJoinQual() {
-      return joinEdge.getJoinQual();
+    if (bestNonCrossJoin != null) {
+      return bestNonCrossJoin;
+    } else {
+      return bestJoin;
     }
+  }
 
-    public String toString() {
-      StringBuilder sb = new StringBuilder(rel.getCanonicalName());
-      if (hasJoinQual()) {
-        sb.append("with ").append(TUtil.arrayToString(joinEdge.getJoinQual()));
+  /**
+   * Find a join between two logical operator trees
+   *
+   * @return If there is no join condition between two relation, it returns NULL value.
+   */
+  private static JoinEdge findJoin(LogicalPlan plan, JoinGraph graph, LogicalNode outer, LogicalNode inner) throws PlanningException {
+    JoinEdge foundJoinEdge = null;
+
+    for (String outerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, outer)) {
+      for (String innerName : PlannerUtil.getRelationLineageWithinQueryBlock(plan, inner)) {
+
+        // Find all joins between two relations and merge them into one join if possible
+        if (graph.hasEdge(outerName, innerName)) {
+          JoinEdge existJoinEdge = graph.getEdge(outerName, innerName);
+          foundJoinEdge = new JoinEdge(existJoinEdge.getJoinType(), outer, inner, existJoinEdge.getJoinQual());
+        }
       }
-      return sb.toString();
     }
-  }
 
-  private List<RelationNode> getSmallerRelations(Map<String, RelationNode> map, Set<String> relationSet) {
-    List<RelationNode> relations = new ArrayList<RelationNode>();
-    for (String name : relationSet) {
-      relations.add(map.get(name));
+    if (foundJoinEdge == null) {
+      foundJoinEdge = new JoinEdge(JoinType.CROSS, outer, inner);
     }
-    Collections.sort(relations, new RelationOpComparator());
-    return relations;
+
+    return foundJoinEdge;
   }
 
-  private List<CandidateJoinNode> getBestJoinRelations(Map<String, RelationNode> map,
-                                                       LogicalNode lastJoin,
-                                                       Set<CandidateJoin> candidateJoins) {
-    List<CandidateJoinNode> relations = new ArrayList<CandidateJoinNode>();
-    for (CandidateJoin candidate : candidateJoins) {
-      relations.add(new CandidateJoinNode(lastJoin, map.get(candidate.getRelationName()), candidate.getEdge()));
+  /**
+   * Getting a cost of one join
+   * @param joinEdge
+   * @return
+   */
+  public static double getCost(JoinEdge joinEdge) {
+    double filterFactor = 1;
+    if (joinEdge.hasJoinQual()) {
+      // TODO - should consider join type
+      // TODO - should statistic information obtained from query history
+      filterFactor = filterFactor * Math.pow(DEFAULT_SELECTION_FACTOR, joinEdge.getJoinQual().length);
+      return getCost(joinEdge.getLeftRelation()) * getCost(joinEdge.getRightRelation()) * filterFactor;
+    } else {
+      // make cost bigger if cross join
+      return Math.pow(getCost(joinEdge.getLeftRelation()) * getCost(joinEdge.getRightRelation()), 2);
     }
-    Collections.sort(relations, new CandidateJoinOpComparator());
-    return relations;
   }
 
   public static double getCost(LogicalNode node) {
-    if (node instanceof ScanNode) {
-      ScanNode scanNode = (ScanNode) node;
-      if (scanNode.getTableDesc().getStats() != null) {
-        return ((ScanNode)node).getTableDesc().getStats().getNumBytes();
+    switch (node.getType()) {
+
+    case PROJECTION:
+      ProjectionNode projectionNode = (ProjectionNode) node;
+      return getCost(projectionNode.getChild());
+
+    case JOIN:
+      JoinNode joinNode = (JoinNode) node;
+      double filterFactor = 1;
+      if (joinNode.hasJoinQual()) {
+        filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR,
+            EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()).length);
+        return getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()) * filterFactor;
       } else {
-        return Long.MAX_VALUE;
+        return Math.pow(getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()), 2);
       }
-    } else {
-      return node.getCost();
-    }
-  }
 
-  /** it assumes that left-deep join tree. */
-  public static double getCost(LogicalNode left, LogicalNode right, EvalNode [] quals) {
-    double filterFactor = 1;
-    if (quals != null) {
-      filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR, quals.length);
-    }
+    case SELECTION:
+      SelectionNode selectionNode = (SelectionNode) node;
+      return getCost(selectionNode.getChild()) *
+          Math.pow(DEFAULT_SELECTION_FACTOR, EvalTreeUtil.getConjNormalForm(selectionNode.getQual()).length);
 
-    if (left instanceof RelationNode) {
-      return getCost(left) * getCost(right) * filterFactor;
-    } else {
-      return getCost(left)
-          + (getCost(left) * getCost(right) * filterFactor);
-    }
-  }
-
-  private double getCost(CandidateJoinNode join) {
-    return getCost(join.getRelationOrJoin(), join.getRelation(), join.getJoinQual());
-  }
+    case TABLE_SUBQUERY:
+      TableSubQueryNode subQueryNode = (TableSubQueryNode) node;
+      return getCost(subQueryNode.getSubQuery());
 
-  class RelationOpComparator implements Comparator<RelationNode> {
-    @Override
-    public int compare(RelationNode o1, RelationNode o2) {
-      if (getCost(o1) < getCost(o2)) {
-        return -1;
-      } else if (getCost(o1) > getCost(o2)) {
-        return 1;
+    case SCAN:
+      ScanNode scanNode = (ScanNode) node;
+      if (scanNode.getTableDesc().getStats() != null) {
+        double cost = ((ScanNode)node).getTableDesc().getStats().getNumBytes();
+        return cost;
       } else {
-        return 0;
+        return Long.MAX_VALUE;
       }
-    }
-  }
 
-  class CandidateJoinOpComparator implements Comparator<CandidateJoinNode> {
-    @Override
-    public int compare(CandidateJoinNode o1, CandidateJoinNode o2) {
-      double cmp = getCost(o1) - getCost(o2);
-      if (cmp < 0) {
-        return -1;
-      } else if (getCost(o1.getRelation()) > getCost(o2.getRelation())) {
-        return 1;
-      } else {
-        return 0;
-      }
+    default:
+      return getCost(node);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
index 844bbf2..e5c29f0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinEdge.java
@@ -21,6 +21,8 @@ package org.apache.tajo.engine.planner.logical.join;
 import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.RelationNode;
 import org.apache.tajo.util.TUtil;
 
 import java.util.Collections;
@@ -28,17 +30,18 @@ import java.util.Set;
 
 public class JoinEdge {
   private final JoinType joinType;
-  private final String leftRelation;
-  private final String rightRelation;
+  private final LogicalNode leftRelation;
+  private final LogicalNode rightRelation;
   private final Set<EvalNode> joinQual = Sets.newHashSet();
 
-  public JoinEdge(JoinType joinType, String leftRelation, String rightRelation) {
+  public JoinEdge(JoinType joinType, LogicalNode leftRelation, LogicalNode rightRelation) {
     this.joinType = joinType;
     this.leftRelation = leftRelation;
     this.rightRelation = rightRelation;
   }
 
-  public JoinEdge(JoinType joinType, String leftRelation, String rightRelation, EvalNode ... condition) {
+  public JoinEdge(JoinType joinType, LogicalNode leftRelation, LogicalNode rightRelation,
+                  EvalNode ... condition) {
     this(joinType, leftRelation, rightRelation);
     Collections.addAll(joinQual, condition);
   }
@@ -47,16 +50,16 @@ public class JoinEdge {
     return joinType;
   }
 
-  public String getLeftRelation() {
+  public LogicalNode getLeftRelation() {
     return leftRelation;
   }
 
-  public String getRightRelation() {
+  public LogicalNode getRightRelation() {
     return rightRelation;
   }
 
   public boolean hasJoinQual() {
-    return joinQual != null;
+    return joinQual.size() > 0;
   }
 
   public void addJoinQual(EvalNode joinQual) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/39fe4d76/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index 2e600ba..66c82f3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -19,40 +19,50 @@
 package org.apache.tajo.engine.planner.logical.join;
 
 import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
 import org.apache.tajo.engine.planner.graph.SimpleUndirectedGraph;
+import org.apache.tajo.engine.planner.logical.JoinNode;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
 public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
-  public Collection<JoinEdge> getJoinsWith(String relation) {
-    return getEdges(relation);
-  }
-
-  public Collection<EvalNode> addJoin(JoinType joinType, EvalNode joinQual) {
-    Set<EvalNode> cnf = Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinQual));
+  public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                      JoinNode joinNode) throws PlanningException {
+    Set<EvalNode> cnf = Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()));
     Set<EvalNode> nonJoinQuals = Sets.newHashSet();
     for (EvalNode singleQual : cnf) {
       if (PlannerUtil.isJoinQual(singleQual)) {
-        List<Column> left = EvalTreeUtil.findAllColumnRefs(singleQual.getLeftExpr());
-        List<Column> right = EvalTreeUtil.findAllColumnRefs(singleQual.getRightExpr());
+        List<Column> leftExpr = EvalTreeUtil.findAllColumnRefs(singleQual.getLeftExpr());
+        List<Column> rightExpr = EvalTreeUtil.findAllColumnRefs(singleQual.getRightExpr());
+
+        String leftExprRelation = leftExpr.get(0).getQualifier();
+        String rightExprRelName = rightExpr.get(0).getQualifier();
 
-        String leftRelName = left.get(0).getQualifier();
-        String rightRelName = right.get(0).getQualifier();
+        Collection<String> leftLineage = PlannerUtil.getRelationLineageWithinQueryBlock(plan, joinNode.getLeftChild());
 
-        JoinEdge edge = getEdge(leftRelName, rightRelName);
+        boolean isLeftExprForLeftTable = leftLineage.contains(leftExprRelation);
+        JoinEdge edge;
+        edge = getEdge(leftExprRelation, rightExprRelName);
 
         if (edge != null) {
           edge.addJoinQual(singleQual);
         } else {
-          edge = new JoinEdge(joinType, leftRelName, rightRelName, singleQual);
-          addEdge(leftRelName, rightRelName, edge);
+          if (isLeftExprForLeftTable) {
+            edge = new JoinEdge(joinNode.getJoinType(),
+                block.getRelation(leftExprRelation), block.getRelation(rightExprRelName), singleQual);
+            addEdge(leftExprRelation, rightExprRelName, edge);
+          } else {
+            edge = new JoinEdge(joinNode.getJoinType(),
+                block.getRelation(rightExprRelName), block.getRelation(leftExprRelation), singleQual);
+            addEdge(rightExprRelName, leftExprRelation, edge);
+          }
         }
       } else {
         nonJoinQuals.add(singleQual);


Mime
View raw message