tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [37/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)
Date Fri, 18 Apr 2014 11:44:40 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
new file mode 100644
index 0000000..2053e36
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -0,0 +1,1057 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.ObjectArrays;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.IndexUtil;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Stack;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
+import static org.apache.tajo.conf.TajoConf.ConfVars;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.GroupbyEnforce.GroupbyAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce;
+
+public class PhysicalPlannerImpl implements PhysicalPlanner {
+  private static final Log LOG = LogFactory.getLog(PhysicalPlannerImpl.class);
+  private static final int UNGENERATED_PID = -1;
+  private final long INNER_JOIN_INMEMORY_HASH_THRESHOLD;
+
+  protected final TajoConf conf;
+  protected final AbstractStorageManager sm;
+
+  public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
+    this.conf = conf;
+    this.sm = sm;
+
+    this.INNER_JOIN_INMEMORY_HASH_THRESHOLD = conf.getLongVar(ConfVars.EXECUTOR_INNER_JOIN_INMEMORY_HASH_THRESHOLD);
+  }
+
+  public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan)
+      throws InternalException {
+
+    PhysicalExec execPlan;
+
+    try {
+      execPlan = createPlanRecursive(context, logicalPlan, new Stack<LogicalNode>());
+      if (execPlan instanceof StoreTableExec
+          || execPlan instanceof RangeShuffleFileWriteExec
+          || execPlan instanceof HashShuffleFileWriteExec
+          || execPlan instanceof ColPartitionStoreExec) {
+        return execPlan;
+      } else if (context.getDataChannel() != null) {
+        return buildOutputOperator(context, logicalPlan, execPlan);
+      } else {
+        return execPlan;
+      }
+    } catch (IOException ioe) {
+      throw new InternalException(ioe);
+    }
+  }
+
+  private PhysicalExec buildOutputOperator(TaskAttemptContext context, LogicalNode plan,
+                                           PhysicalExec execPlan) throws IOException {
+    DataChannel channel = context.getDataChannel();
+    ShuffleFileWriteNode shuffleFileWriteNode = LogicalPlan.createNodeWithoutPID(ShuffleFileWriteNode.class);
+    shuffleFileWriteNode.setStorageType(context.getDataChannel().getStoreType());
+    shuffleFileWriteNode.setInSchema(plan.getOutSchema());
+    shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
+    shuffleFileWriteNode.setShuffle(channel.getShuffleType(), channel.getShuffleKeys(), channel.getShuffleOutputNum());
+    shuffleFileWriteNode.setChild(plan);
+
+    PhysicalExec outExecPlan = createShuffleFileWritePlan(context, shuffleFileWriteNode, execPlan);
+    return outExecPlan;
+  }
+
+  private PhysicalExec createPlanRecursive(TaskAttemptContext ctx, LogicalNode logicalNode, Stack<LogicalNode> stack)
+      throws IOException {
+    PhysicalExec leftExec;
+    PhysicalExec rightExec;
+
+    switch (logicalNode.getType()) {
+
+      case ROOT:
+        LogicalRootNode rootNode = (LogicalRootNode) logicalNode;
+        stack.push(rootNode);
+        leftExec = createPlanRecursive(ctx, rootNode.getChild(), stack);
+        stack.pop();
+        return leftExec;
+
+      case EXPRS:
+        EvalExprNode evalExpr = (EvalExprNode) logicalNode;
+        return new EvalExprExec(ctx, evalExpr);
+
+      case CREATE_TABLE:
+      case INSERT:
+      case STORE:
+        StoreTableNode storeNode = (StoreTableNode) logicalNode;
+        stack.push(storeNode);
+        leftExec = createPlanRecursive(ctx, storeNode.getChild(), stack);
+        stack.pop();
+        return createStorePlan(ctx, storeNode, leftExec);
+
+      case SELECTION:
+        SelectionNode selNode = (SelectionNode) logicalNode;
+        stack.push(selNode);
+        leftExec = createPlanRecursive(ctx, selNode.getChild(), stack);
+        stack.pop();
+        return new SelectionExec(ctx, selNode, leftExec);
+
+      case PROJECTION:
+        ProjectionNode prjNode = (ProjectionNode) logicalNode;
+        stack.push(prjNode);
+        leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack);
+        stack.pop();
+        return new ProjectionExec(ctx, prjNode, leftExec);
+
+      case TABLE_SUBQUERY: {
+        TableSubQueryNode subQueryNode = (TableSubQueryNode) logicalNode;
+        stack.push(subQueryNode);
+        leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery(), stack);
+        stack.pop();
+        return new ProjectionExec(ctx, subQueryNode, leftExec);
+
+      }
+
+      case PARTITIONS_SCAN:
+      case SCAN:
+        leftExec = createScanPlan(ctx, (ScanNode) logicalNode, stack);
+        return leftExec;
+
+      case GROUP_BY:
+        GroupbyNode grpNode = (GroupbyNode) logicalNode;
+        stack.push(grpNode);
+        leftExec = createPlanRecursive(ctx, grpNode.getChild(), stack);
+        stack.pop();
+        return createGroupByPlan(ctx, grpNode, leftExec);
+
+      case HAVING:
+        HavingNode havingNode = (HavingNode) logicalNode;
+        stack.push(havingNode);
+        leftExec = createPlanRecursive(ctx, havingNode.getChild(), stack);
+        stack.pop();
+        return new HavingExec(ctx, havingNode, leftExec);
+
+      case SORT:
+        SortNode sortNode = (SortNode) logicalNode;
+        stack.push(sortNode);
+        leftExec = createPlanRecursive(ctx, sortNode.getChild(), stack);
+        stack.pop();
+        return createSortPlan(ctx, sortNode, leftExec);
+
+      case JOIN:
+        JoinNode joinNode = (JoinNode) logicalNode;
+        stack.push(joinNode);
+        leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack);
+        rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack);
+        stack.pop();
+        return createJoinPlan(ctx, joinNode, leftExec, rightExec);
+
+      case UNION:
+        UnionNode unionNode = (UnionNode) logicalNode;
+        stack.push(unionNode);
+        leftExec = createPlanRecursive(ctx, unionNode.getLeftChild(), stack);
+        rightExec = createPlanRecursive(ctx, unionNode.getRightChild(), stack);
+        stack.pop();
+        return new UnionExec(ctx, leftExec, rightExec);
+
+      case LIMIT:
+        LimitNode limitNode = (LimitNode) logicalNode;
+        stack.push(limitNode);
+        leftExec = createPlanRecursive(ctx, limitNode.getChild(), stack);
+        stack.pop();
+        return new LimitExec(ctx, limitNode.getInSchema(),
+            limitNode.getOutSchema(), leftExec, limitNode);
+
+      case BST_INDEX_SCAN:
+        IndexScanNode indexScanNode = (IndexScanNode) logicalNode;
+        leftExec = createIndexScanExec(ctx, indexScanNode);
+        return leftExec;
+
+      default:
+        return null;
+    }
+  }
+
+  @VisibleForTesting
+  public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException {
+    long size = 0;
+    for (String tableId : tableIds) {
+      // TODO - CSV is a hack.
+      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV,
+          ctx.getTables(tableId));
+      for (FileFragment frag : fragments) {
+        size += frag.getEndKey();
+      }
+    }
+    return size;
+  }
+
+  @VisibleForTesting
+  public boolean checkIfInMemoryInnerJoinIsPossible(TaskAttemptContext context, LogicalNode node, boolean left)
+      throws IOException {
+    String [] lineage = PlannerUtil.getRelationLineage(node);
+    long volume = estimateSizeRecursive(context, lineage);
+    boolean inMemoryInnerJoinFlag = volume <= INNER_JOIN_INMEMORY_HASH_THRESHOLD;
+    LOG.info(String.format("[%s] the volume of %s relations (%s) is %s and is %sfit to main maemory.",
+        context.getTaskId().toString(),
+        (left ? "Left" : "Right"),
+        TUtil.arrayToString(lineage),
+        FileUtil.humanReadableByteCount(volume, false),
+        (inMemoryInnerJoinFlag ? "" : "not ")));
+    return inMemoryInnerJoinFlag;
+  }
+
+  public PhysicalExec createJoinPlan(TaskAttemptContext context, JoinNode joinNode, PhysicalExec leftExec,
+                                     PhysicalExec rightExec) throws IOException {
+
+    switch (joinNode.getJoinType()) {
+      case CROSS:
+        return createCrossJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case INNER:
+        return createInnerJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case LEFT_OUTER:
+        return createLeftOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case RIGHT_OUTER:
+        return createRightOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case FULL_OUTER:
+        return createFullOuterJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case LEFT_SEMI:
+        return createLeftSemiJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case RIGHT_SEMI:
+        return createRightSemiJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case LEFT_ANTI:
+        return createLeftAntiJoinPlan(context, joinNode, leftExec, rightExec);
+
+      case RIGHT_ANTI:
+        return createRightAntiJoinPlan(context, joinNode, leftExec, rightExec);
+
+      default:
+        throw new PhysicalPlanningException("Cannot support join type: " + joinNode.getJoinType().name());
+    }
+  }
+
+  private PhysicalExec createCrossJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        default:
+          // fallback algorithm
+          LOG.error("Invalid Cross Join Algorithm Enforcer: " + algorithm.name());
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+      }
+
+    } else {
+      return new BNLJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                           PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+
+      switch (algorithm) {
+        case NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Nested Loop Join]");
+          return new NLJoinExec(context, plan, leftExec, rightExec);
+        case BLOCK_NESTED_LOOP_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Block Nested Loop Join]");
+          return new BNLJoinExec(context, plan, leftExec, rightExec);
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+          // returns two PhysicalExec. smaller one is 0, and larger one is 1.
+          PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+          return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
+        case MERGE_JOIN:
+          LOG.info("Join (" + plan.getPID() +") chooses [Sort Merge Join]");
+          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+        case HYBRID_HASH_JOIN:
+
+        default:
+          LOG.error("Invalid Inner Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createMergeInnerJoin(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestInnerJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  /**
+   * It returns two {@link org.apache.tajo.engine.planner.physical.PhysicalExec}s sorted in an ascending order of
+   * their child relations' total volume. In other words, the smaller side is returned as 0's PhysicalExec, and
+   * the larger side is returned as 1's PhysicalExec.
+   */
+  @VisibleForTesting
+  public PhysicalExec [] switchJoinSidesIfNecessary(TaskAttemptContext context, JoinNode plan,
+                                                     PhysicalExec left, PhysicalExec right) throws IOException {
+    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
+    long leftSize = estimateSizeRecursive(context, leftLineage);
+    long rightSize = estimateSizeRecursive(context, rightLineage);
+
+    PhysicalExec smaller;
+    PhysicalExec larger;
+    if (leftSize <= rightSize) {
+      smaller = left;
+      larger = right;
+      LOG.info(String.format("[%s] Left relations %s (%s) is smaller than Right relations %s (%s).",
+          context.getTaskId().toString(),
+          TUtil.arrayToString(leftLineage),
+          FileUtil.humanReadableByteCount(leftSize, false),
+          TUtil.arrayToString(rightLineage),
+          FileUtil.humanReadableByteCount(rightSize, false)));
+    } else {
+      smaller = right;
+      larger = left;
+      LOG.info(String.format("[%s] Right relations %s (%s) is smaller than Left relations %s (%s).",
+          context.getTaskId().toString(),
+          TUtil.arrayToString(rightLineage),
+          FileUtil.humanReadableByteCount(rightSize, false),
+          TUtil.arrayToString(leftLineage),
+          FileUtil.humanReadableByteCount(leftSize, false)));
+    }
+
+    return new PhysicalExec [] {smaller, larger};
+  }
+
+  private PhysicalExec createBestInnerJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    boolean inMemoryHashJoin = false;
+    if (checkIfInMemoryInnerJoinIsPossible(context, plan.getLeftChild(), true)
+        || checkIfInMemoryInnerJoinIsPossible(context, plan.getRightChild(), false)) {
+      inMemoryHashJoin = true;
+    }
+
+    if (inMemoryHashJoin) {
+      LOG.info("Join (" + plan.getPID() +") chooses [In-memory Hash Join]");
+      // returns two PhysicalExec. smaller one is 0, and larger one is 1.
+      PhysicalExec [] orderedChilds = switchJoinSidesIfNecessary(context, plan, leftExec, rightExec);
+      return new HashJoinExec(context, plan, orderedChilds[1], orderedChilds[0]);
+    } else {
+      return createMergeInnerJoin(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private MergeJoinExec createMergeInnerJoin(TaskAttemptContext context, JoinNode plan,
+                                             PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    SortSpec[][] sortSpecs = PlannerUtil.getSortKeysFromJoinQual(
+        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+
+    SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    leftSortNode.setSortSpecs(sortSpecs[0]);
+    leftSortNode.setInSchema(leftExec.getSchema());
+    leftSortNode.setOutSchema(leftExec.getSchema());
+    ExternalSortExec outerSort = new ExternalSortExec(context, sm, leftSortNode, leftExec);
+
+    SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    rightSortNode.setSortSpecs(sortSpecs[1]);
+    rightSortNode.setInSchema(rightExec.getSchema());
+    rightSortNode.setOutSchema(rightExec.getSchema());
+    ExternalSortExec innerSort = new ExternalSortExec(context, sm, rightSortNode, rightExec);
+
+    LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]");
+    return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]);
+  }
+
+  private PhysicalExec createLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+        case NESTED_LOOP_JOIN:
+          //the right operand is too large, so we opt for NL implementation of left outer join
+          LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+          return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+        default:
+          LOG.error("Invalid Left Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestLeftOuterJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestLeftOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                   PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
+    long rightTableVolume = estimateSizeRecursive(context, rightLineage);
+
+    if (rightTableVolume < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)) {
+      // we can implement left outer join using hash join, using the right operand as the build relation
+      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+      return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+    }
+    else {
+      //the right operand is too large, so we opt for NL implementation of left outer join
+      LOG.info("Left Outer Join (" + plan.getPID() +") chooses [Nested Loop Join].");
+      return new NLLeftOuterJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createBestRightJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    //if the left operand is small enough => implement it as a left outer hash join with exchanged operators (note:
+    // blocking, but merge join is blocking as well)
+    String [] outerLineage4 = PlannerUtil.getRelationLineage(plan.getLeftChild());
+    long outerSize = estimateSizeRecursive(context, outerLineage4);
+    if (outerSize < conf.getLongVar(ConfVars.EXECUTOR_OUTER_JOIN_INMEMORY_HASH_THRESHOLD)){
+      LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+      return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+    } else {
+      return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createRightOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                     PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    //the left operand is too large, so opt for merge join implementation
+    LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Merge Join].");
+    SortSpec[][] sortSpecs2 = PlannerUtil.getSortKeysFromJoinQual(
+        plan.getJoinQual(), leftExec.getSchema(), rightExec.getSchema());
+
+    SortNode leftSortNode2 = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    leftSortNode2.setSortSpecs(sortSpecs2[0]);
+    leftSortNode2.setInSchema(leftExec.getSchema());
+    leftSortNode2.setOutSchema(leftExec.getSchema());
+    ExternalSortExec outerSort2 = new ExternalSortExec(context, sm, leftSortNode2, leftExec);
+
+    SortNode rightSortNode2 = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    rightSortNode2.setSortSpecs(sortSpecs2[1]);
+    rightSortNode2.setInSchema(rightExec.getSchema());
+    rightSortNode2.setOutSchema(rightExec.getSchema());
+    ExternalSortExec innerSort2 = new ExternalSortExec(context, sm, rightSortNode2, rightExec);
+
+    return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]);
+  }
+
+  private PhysicalExec createRightOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Right Outer Join (" + plan.getPID() +") chooses [Hash Join].");
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+        case MERGE_JOIN:
+          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+        default:
+          LOG.error("Invalid Right Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createRightOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestRightJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private PhysicalExec createFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+
+        case MERGE_JOIN:
+          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+
+        default:
+          LOG.error("Invalid Full Outer Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback merge join algorithm: " + JoinAlgorithm.MERGE_JOIN.name());
+          return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+      }
+    } else {
+      return createBestFullOuterJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  private HashFullOuterJoinExec createFullOuterHashJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                            PhysicalExec leftExec, PhysicalExec rightExec)
+      throws IOException {
+    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
+    long outerSize2 = estimateSizeRecursive(context, leftLineage);
+    long innerSize2 = estimateSizeRecursive(context, rightLineage);
+
+    PhysicalExec selectedRight;
+    PhysicalExec selectedLeft;
+
+    // HashJoinExec loads the smaller relation to memory.
+    if (outerSize2 <= innerSize2) {
+      selectedLeft = leftExec;
+      selectedRight = rightExec;
+    } else {
+      selectedLeft = rightExec;
+      selectedRight = leftExec;
+    }
+    LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Hash Join]");
+    return new HashFullOuterJoinExec(context, plan, selectedRight, selectedLeft);
+  }
+
+  private MergeFullOuterJoinExec createFullOuterMergeJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                              PhysicalExec leftExec, PhysicalExec rightExec)
+      throws IOException {
+    // if size too large, full outer merge join implementation
+    LOG.info("Full Outer Join (" + plan.getPID() +") chooses [Merge Join]");
+    SortSpec[][] sortSpecs3 = PlannerUtil.getSortKeysFromJoinQual(plan.getJoinQual(),
+        leftExec.getSchema(), rightExec.getSchema());
+
+    SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    leftSortNode.setSortSpecs(sortSpecs3[0]);
+    leftSortNode.setInSchema(leftExec.getSchema());
+    leftSortNode.setOutSchema(leftExec.getSchema());
+    ExternalSortExec outerSort3 = new ExternalSortExec(context, sm, leftSortNode, leftExec);
+
+    SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    rightSortNode.setSortSpecs(sortSpecs3[1]);
+    rightSortNode.setInSchema(rightExec.getSchema());
+    rightSortNode.setOutSchema(rightExec.getSchema());
+    ExternalSortExec innerSort3 = new ExternalSortExec(context, sm, rightSortNode, rightExec);
+
+    return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]);
+  }
+
+  private PhysicalExec createBestFullOuterJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                                   PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    String [] leftLineage = PlannerUtil.getRelationLineage(plan.getLeftChild());
+    String [] rightLineage = PlannerUtil.getRelationLineage(plan.getRightChild());
+    long outerSize2 = estimateSizeRecursive(context, leftLineage);
+    long innerSize2 = estimateSizeRecursive(context, rightLineage);
+    final long threshold = 1048576 * 128;
+    if (outerSize2 < threshold || innerSize2 < threshold) {
+      return createFullOuterHashJoinPlan(context, plan, leftExec, rightExec);
+    } else {
+      return createFullOuterMergeJoinPlan(context, plan, leftExec, rightExec);
+    }
+  }
+
+  /**
+   *  Left semi join means that the left side is the IN side table, and the right side is the FROM side table.
+   */
+  private PhysicalExec createLeftSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, leftExec, rightExec);
+      }
+    } else {
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createRightSemiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+      }
+    } else {
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+    }
+  }
+
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createLeftAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                              PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+      }
+    } else {
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftAntiJoinExec(context, plan, leftExec, rightExec);
+    }
+  }
+
+  /**
+   *  Left semi join means that the left side is the FROM side table, and the right side is the IN side table.
+   */
+  private PhysicalExec createRightAntiJoinPlan(TaskAttemptContext context, JoinNode plan,
+                                               PhysicalExec leftExec, PhysicalExec rightExec) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, plan);
+    if (property != null) {
+      JoinAlgorithm algorithm = property.getJoin().getAlgorithm();
+      switch (algorithm) {
+        case IN_MEMORY_HASH_JOIN:
+          LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+          return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+
+        default:
+          LOG.error("Invalid Left Semi Join Algorithm Enforcer: " + algorithm.name());
+          LOG.error("Choose a fallback inner join algorithm: " + JoinAlgorithm.IN_MEMORY_HASH_JOIN.name());
+          return new HashLeftOuterJoinExec(context, plan, rightExec, leftExec);
+      }
+    } else {
+      LOG.info("Left Semi Join (" + plan.getPID() +") chooses [In Memory Hash Join].");
+      return new HashLeftSemiJoinExec(context, plan, rightExec, leftExec);
+    }
+  }
+
+
+  /**
+   * Create a shuffle file write executor to store intermediate data into local disks.
+   */
+  public PhysicalExec createShuffleFileWritePlan(TaskAttemptContext ctx,
+                                                 ShuffleFileWriteNode plan, PhysicalExec subOp) throws IOException {
+    switch (plan.getShuffleType()) {
+    case HASH_SHUFFLE:
+      return new HashShuffleFileWriteExec(ctx, sm, plan, subOp);
+
+    case RANGE_SHUFFLE:
+      SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class);
+
+      SortSpec [] sortSpecs = null;
+      if (sortExec != null) {
+        sortSpecs = sortExec.getSortSpecs();
+      } else {
+        Column[] columns = ctx.getDataChannel().getShuffleKeys();
+        SortSpec specs[] = new SortSpec[columns.length];
+        for (int i = 0; i < columns.length; i++) {
+          specs[i] = new SortSpec(columns[i]);
+        }
+      }
+      return new RangeShuffleFileWriteExec(ctx, sm, subOp, plan.getInSchema(), plan.getInSchema(), sortSpecs);
+
+    case NONE_SHUFFLE:
+      return new StoreTableExec(ctx, plan, subOp);
+
+    default:
+      throw new IllegalStateException(ctx.getDataChannel().getShuffleType() + " is not supported yet.");
+    }
+  }
+
+  /**
+   * Create a executor to store a table into HDFS. This is used for CREATE TABLE ..
+   * AS or INSERT (OVERWRITE) INTO statement.
+   */
+  public PhysicalExec createStorePlan(TaskAttemptContext ctx,
+                                      StoreTableNode plan, PhysicalExec subOp) throws IOException {
+
+    if (plan.getPartitionMethod() != null) {
+      switch (plan.getPartitionMethod().getPartitionType()) {
+      case COLUMN:
+        return createColumnPartitionStorePlan(ctx, plan, subOp);
+      default:
+        throw new IllegalStateException(plan.getPartitionMethod().getPartitionType() + " is not supported yet.");
+      }
+    } else {
+      return new StoreTableExec(ctx, plan, subOp);
+    }
+  }
+
+  private PhysicalExec createColumnPartitionStorePlan(TaskAttemptContext context,
+                                                      StoreTableNode storeTableNode,
+                                                      PhysicalExec child) throws IOException {
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, storeTableNode);
+    if (property != null) {
+      ColumnPartitionAlgorithm algorithm = property.getColumnPartition().getAlgorithm();
+      switch (algorithm) {
+      case HASH_PARTITION:
+        return createHashColumnPartitionStorePlan(context, storeTableNode, child);
+      case SORT_PARTITION: // default algorithm
+      default:
+        return createSortBasedColumnPartitionStorePlan(context, storeTableNode, child);
+      }
+    } else { // default algorithm is sorted-based column partition
+      return createSortBasedColumnPartitionStorePlan(context, storeTableNode, child);
+    }
+  }
+
+  private PhysicalExec createHashColumnPartitionStorePlan(TaskAttemptContext context,
+                                                          StoreTableNode storeTableNode,
+                                                          PhysicalExec child) throws IOException {
+    LOG.info("The planner chooses [Hash-based Column Partitioned Store] algorithm");
+    return new HashBasedColPartitionStoreExec(context, storeTableNode, child);
+  }
+
+  private PhysicalExec createSortBasedColumnPartitionStorePlan(TaskAttemptContext context,
+                                                               StoreTableNode storeTableNode,
+                                                               PhysicalExec child) throws IOException {
+
+    Column[] partitionKeyColumns = storeTableNode.getPartitionMethod().getExpressionSchema().toArray();
+    SortSpec[] sortSpecs = new SortSpec[partitionKeyColumns.length];
+
+    if (storeTableNode.getType() == NodeType.INSERT) {
+      InsertNode insertNode = (InsertNode) storeTableNode;
+      for (int i = 0; i < partitionKeyColumns.length; i++) {
+        for (Column column : partitionKeyColumns) {
+          int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
+          sortSpecs[i++] = new SortSpec(insertNode.getProjectedSchema().getColumn(id), true, false);
+        }
+      }
+    } else {
+      for (int i = 0; i < partitionKeyColumns.length; i++) {
+        sortSpecs[i] = new SortSpec(partitionKeyColumns[i], true, false);
+      }
+    }
+
+    SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    sortNode.setSortSpecs(sortSpecs);
+    sortNode.setInSchema(child.getSchema());
+    sortNode.setOutSchema(child.getSchema());
+
+    ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, child);
+    LOG.info("The planner chooses [Sort-based Column Partitioned Store] algorithm");
+    return new SortBasedColPartitionStoreExec(context, storeTableNode, sortExec);
+  }
+
+  private boolean checkIfSortEquivalance(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node) {
+    Enforcer enforcer = ctx.getEnforcer();
+    List<EnforceProperty> property = enforcer.getEnforceProperties(EnforceType.SORTED_INPUT);
+    if (property != null && property.size() > 0 && node.peek().getType() == NodeType.SORT) {
+      SortNode sortNode = (SortNode) node.peek();
+      TajoWorkerProtocol.SortedInputEnforce sortEnforcer = property.get(0).getSortedInput();
+
+      boolean condition = scanNode.getTableName().equals(sortEnforcer.getTableName());
+      SortSpec [] sortSpecs = PlannerUtil.convertSortSpecs(sortEnforcer.getSortSpecsList());
+      return condition && TUtil.checkEquals(sortNode.getSortKeys(), sortSpecs);
+    } else {
+      return false;
+    }
+  }
+
+  public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> node)
+      throws IOException {
+    if (ctx.getTable(scanNode.getCanonicalName()) == null) {
+      return new SeqScanExec(ctx, sm, scanNode, null);
+    }
+    Preconditions.checkNotNull(ctx.getTable(scanNode.getCanonicalName()),
+        "Error: There is no table matched to %s", scanNode.getCanonicalName() + "(" + scanNode.getTableName() + ")");    
+
+    // check if an input is sorted in the same order to the subsequence sort operator.
+    // TODO - it works only if input files are raw files. We should check the file format.
+    // Since the default intermediate file format is raw file, it is not problem right now.
+    if (checkIfSortEquivalance(ctx, scanNode, node)) {
+      FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+      return new ExternalSortExec(ctx, sm, (SortNode) node.peek(), fragments);
+    } else {
+      Enforcer enforcer = ctx.getEnforcer();
+
+      // check if this table is broadcasted one or not.
+      boolean broadcastFlag = false;
+      if (enforcer != null && enforcer.hasEnforceProperty(EnforceType.BROADCAST)) {
+        List<EnforceProperty> properties = enforcer.getEnforceProperties(EnforceType.BROADCAST);
+        for (EnforceProperty property : properties) {
+          broadcastFlag |= scanNode.getCanonicalName().equals(property.getBroadcast().getTableName());
+        }
+      }
+
+      if (scanNode instanceof PartitionedTableScanNode
+          && ((PartitionedTableScanNode)scanNode).getInputPaths() != null &&
+          ((PartitionedTableScanNode)scanNode).getInputPaths().length > 0) {
+
+        if (scanNode instanceof PartitionedTableScanNode) {
+          if (broadcastFlag) {
+            PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
+            List<FileFragment> fileFragments = TUtil.newList();
+            for (Path path : partitionedTableScanNode.getInputPaths()) {
+              fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path)));
+            }
+
+            return new PartitionMergeScanExec(ctx, sm, scanNode,
+                FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])));
+          }
+        }
+      }
+
+      FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName());
+      return new SeqScanExec(ctx, sm, scanNode, fragments);
+    }
+  }
+
+  public PhysicalExec createGroupByPlan(TaskAttemptContext context,GroupbyNode groupbyNode, PhysicalExec subOp)
+      throws IOException {
+
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, groupbyNode);
+    if (property != null) {
+      GroupbyAlgorithm algorithm = property.getGroupby().getAlgorithm();
+      if (algorithm == GroupbyAlgorithm.HASH_AGGREGATION) {
+        return createInMemoryHashAggregation(context, groupbyNode, subOp);
+      } else {
+        return createSortAggregation(context, property, groupbyNode, subOp);
+      }
+    }
+    return createBestAggregationPlan(context, groupbyNode, subOp);
+  }
+
+  private PhysicalExec createInMemoryHashAggregation(TaskAttemptContext ctx,GroupbyNode groupbyNode, PhysicalExec subOp)
+      throws IOException {
+    LOG.info("The planner chooses [Hash Aggregation]");
+    return new HashAggregateExec(ctx, groupbyNode, subOp);
+  }
+
+  private PhysicalExec createSortAggregation(TaskAttemptContext ctx, EnforceProperty property, GroupbyNode groupbyNode,
+                                             PhysicalExec subOp) throws IOException {
+
+    Column[] grpColumns = groupbyNode.getGroupingColumns();
+    SortSpec[] sortSpecs = new SortSpec[grpColumns.length];
+    for (int i = 0; i < grpColumns.length; i++) {
+      sortSpecs[i] = new SortSpec(grpColumns[i], true, false);
+    }
+
+    if (property != null) {
+      List<CatalogProtos.SortSpecProto> sortSpecProtos = property.getGroupby().getSortSpecsList();
+
+      List<SortSpec> enforcedSortSpecList = Lists.newArrayList();
+      int i = 0;
+      outer:
+      for (int j = 0; j < sortSpecProtos.size(); j++) {
+        SortSpec enforcedSortSpecs = new SortSpec(sortSpecProtos.get(j));
+
+        for (Column grpKey : grpColumns) { // if this sort key is included in grouping columns, skip it.
+          if (enforcedSortSpecs.getSortKey().equals(grpKey)) {
+            continue outer;
+          }
+        }
+
+        enforcedSortSpecList.add(enforcedSortSpecs);
+      }
+
+      sortSpecs = ObjectArrays.concat(sortSpecs, TUtil.toArray(enforcedSortSpecList, SortSpec.class), SortSpec.class);
+    }
+
+    SortNode sortNode = LogicalPlan.createNodeWithoutPID(SortNode.class);
+    sortNode.setSortSpecs(sortSpecs);
+    sortNode.setInSchema(subOp.getSchema());
+    sortNode.setOutSchema(subOp.getSchema());
+    ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp);
+    LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")");
+    return new SortAggregateExec(ctx, groupbyNode, sortExec);
+  }
+
+  private PhysicalExec createBestAggregationPlan(TaskAttemptContext context, GroupbyNode groupbyNode,
+                                                 PhysicalExec subOp) throws IOException {
+    Column[] grpColumns = groupbyNode.getGroupingColumns();
+    if (grpColumns.length == 0) {
+      return createInMemoryHashAggregation(context, groupbyNode, subOp);
+    }
+
+    String [] outerLineage = PlannerUtil.getRelationLineage(groupbyNode.getChild());
+    long estimatedSize = estimateSizeRecursive(context, outerLineage);
+    final long threshold = conf.getLongVar(ConfVars.EXECUTOR_GROUPBY_INMEMORY_HASH_THRESHOLD);
+
+    // if the relation size is less than the threshold,
+    // the hash aggregation will be used.
+    if (estimatedSize <= threshold) {
+      LOG.info("The planner chooses [Hash Aggregation]");
+      return createInMemoryHashAggregation(context, groupbyNode, subOp);
+    } else {
+      return createSortAggregation(context, null, groupbyNode, subOp);
+    }
+  }
+
+  public PhysicalExec createSortPlan(TaskAttemptContext context, SortNode sortNode,
+                                     PhysicalExec child) throws IOException {
+
+    // check if it is a distributed merge sort
+    // If so, it does need to create a sort executor because
+    // the sort executor is created at the scan planning
+    if (child instanceof SortExec) {
+      SortExec childSortExec = (SortExec) child;
+      if (TUtil.checkEquals(sortNode.getSortKeys(), childSortExec.getSortSpecs())) {
+        return child;
+      }
+    }
+
+    Enforcer enforcer = context.getEnforcer();
+    EnforceProperty property = getAlgorithmEnforceProperty(enforcer, sortNode);
+    if (property != null) {
+      SortEnforce.SortAlgorithm algorithm = property.getSort().getAlgorithm();
+      if (algorithm == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) {
+        return new MemSortExec(context, sortNode, child);
+      } else {
+        return new ExternalSortExec(context, sm, sortNode, child);
+      }
+    }
+
+    return createBestSortPlan(context, sortNode, child);
+  }
+
+  public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode,
+                                     PhysicalExec child) throws IOException {
+    return new ExternalSortExec(context, sm, sortNode, child);
+  }
+
+  public PhysicalExec createIndexScanExec(TaskAttemptContext ctx,
+                                          IndexScanNode annotation)
+      throws IOException {
+    //TODO-general Type Index
+    Preconditions.checkNotNull(ctx.getTable(annotation.getCanonicalName()),
+        "Error: There is no table matched to %s", annotation.getCanonicalName());
+
+    FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName());
+    List<FileFragment> fragments =
+        FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos);
+
+    String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
+    Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
+
+    TupleComparator comp = new TupleComparator(annotation.getKeySchema(),
+        annotation.getSortKeys());
+    return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName),
+        annotation.getKeySchema(), comp, annotation.getDatum());
+
+  }
+
+  private EnforceProperty getAlgorithmEnforceProperty(Enforcer enforcer, LogicalNode node) {
+    if (enforcer == null) {
+      return null;
+    }
+
+    EnforceType type;
+    if (node.getType() == NodeType.JOIN) {
+      type = EnforceType.JOIN;
+    } else if (node.getType() == NodeType.GROUP_BY) {
+      type = EnforceType.GROUP_BY;
+    } else if (node.getType() == NodeType.SORT) {
+      type = EnforceType.SORT;
+    } else if (node instanceof StoreTableNode
+        && ((StoreTableNode)node).hasPartition()
+        && ((StoreTableNode)node).getPartitionMethod().getPartitionType() == PartitionType.COLUMN) {
+      type = EnforceType.COLUMN_PARTITION;
+    } else {
+      return null;
+    }
+
+    if (enforcer.hasEnforceProperty(type)) {
+      List<EnforceProperty> properties = enforcer.getEnforceProperties(type);
+      EnforceProperty found = null;
+      for (EnforceProperty property : properties) {
+        if (type == EnforceType.JOIN && property.getJoin().getPid() == node.getPID()) {
+          found = property;
+        } else if (type == EnforceType.GROUP_BY && property.getGroupby().getPid() == node.getPID()) {
+          found = property;
+        } else if (type == EnforceType.SORT && property.getSort().getPid() == node.getPID()) {
+          found = property;
+        } else if (type == EnforceType.COLUMN_PARTITION && property.getColumnPartition().getPid() == node.getPID()) {
+          found = property;
+        }
+      }
+      return found;
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
new file mode 100644
index 0000000..1b0a7c3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanningException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import java.io.IOException;
+
+public class PhysicalPlanningException extends IOException {
+  public PhysicalPlanningException(String message) {
+    super(message);
+  }
+
+  public PhysicalPlanningException(Throwable t) {
+    super(t);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanString.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanString.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanString.java
new file mode 100644
index 0000000..98f921b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanString.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+public class PlanString {
+  final StringBuilder title;
+
+  final List<String> explanations = new ArrayList<String>();
+  final List<String> details = new ArrayList<String>();
+
+  StringBuilder currentExplanation;
+  StringBuilder currentDetail;
+
+  public PlanString(LogicalNode node) {
+    this.title = new StringBuilder(node.getType().name() + "(" + node.getPID() + ")");
+  }
+
+  public PlanString(String title) {
+    this.title = new StringBuilder(title);
+  }
+
+  public PlanString appendTitle(String str) {
+    title.append(str);
+    return this;
+  }
+
+  public PlanString addExplan(String explain) {
+    flushCurrentExplanation();
+    currentExplanation = new StringBuilder(explain);
+    return this;
+  }
+
+  public PlanString appendExplain(String explain) {
+    if (currentExplanation == null) {
+      currentExplanation = new StringBuilder();
+    }
+    currentExplanation.append(explain);
+    return this;
+  }
+
+  public PlanString addDetail(String detail) {
+    flushCurrentDetail();
+    currentDetail = new StringBuilder(detail);
+    return this;
+  }
+
+  public PlanString appendDetail(String detail) {
+    if (currentDetail == null) {
+      currentDetail = new StringBuilder();
+    }
+    currentDetail.append(detail);
+    return this;
+
+  }
+
+  public String getTitle() {
+    return title.toString();
+  }
+
+  public List<String> getExplanations() {
+    flushCurrentExplanation();
+    return explanations;
+  }
+
+  public List<String> getDetails() {
+    flushCurrentDetail();
+    return details;
+  }
+
+  private void flushCurrentExplanation() {
+    if (currentExplanation != null) {
+      explanations.add(currentExplanation.toString());
+      currentExplanation = null;
+    }
+  }
+
+  private void flushCurrentDetail() {
+    if (currentDetail != null) {
+      details.add(currentDetail.toString());
+      currentDetail = null;
+    }
+  }
+
+  public String toString() {
+    StringBuilder output = new StringBuilder();
+    output.append(getTitle()).append("\n");
+
+    for (String str : getExplanations()) {
+      output.append("  => ").append(str).append("\n");
+    }
+
+    for (String str : getDetails()) {
+      output.append("  => ").append(str).append("\n");
+    }
+    return output.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
new file mode 100644
index 0000000..9f988bd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+public class PlannerUtil {
+
+  public static boolean checkIfDDLPlan(LogicalNode node) {
+    LogicalNode baseNode = node;
+    if (node instanceof LogicalRootNode) {
+      baseNode = ((LogicalRootNode) node).getChild();
+    }
+
+    NodeType type = baseNode.getType();
+
+    return
+        type == NodeType.CREATE_DATABASE ||
+            type == NodeType.DROP_DATABASE ||
+            (type == NodeType.CREATE_TABLE && !((CreateTableNode) baseNode).hasSubQuery()) ||
+            baseNode.getType() == NodeType.DROP_TABLE ||
+            baseNode.getType() == NodeType.ALTER_TABLESPACE ||
+            baseNode.getType() == NodeType.ALTER_TABLE;
+  }
+
+  /**
+   * Checks whether the query is simple or not.
+   * The simple query can be defined as 'select * from tb_name [LIMIT X]'.
+   *
+   * @param plan The logical plan
+   * @return True if the query is a simple query.
+   */
+  public static boolean checkIfSimpleQuery(LogicalPlan plan) {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+    // one block, without where clause, no group-by, no-sort, no-join
+    boolean isOneQueryBlock = plan.getQueryBlocks().size() == 1;
+    boolean simpleOperator = rootNode.getChild().getType() == NodeType.LIMIT
+        || rootNode.getChild().getType() == NodeType.SCAN;
+    boolean noOrderBy = !plan.getRootBlock().hasNode(NodeType.SORT);
+    boolean noGroupBy = !plan.getRootBlock().hasNode(NodeType.GROUP_BY);
+    boolean noWhere = !plan.getRootBlock().hasNode(NodeType.SELECTION);
+    boolean noJoin = !plan.getRootBlock().hasNode(NodeType.JOIN);
+    boolean singleRelation = plan.getRootBlock().hasNode(NodeType.SCAN)
+        && PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1;
+
+    boolean noComplexComputation = false;
+    if (singleRelation) {
+      ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+      if (!scanNode.getTableDesc().hasPartition() && scanNode.hasTargets()
+          && scanNode.getTargets().length == scanNode.getInSchema().size()) {
+        noComplexComputation = true;
+        for (int i = 0; i < scanNode.getTargets().length; i++) {
+          noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD;
+          if (noComplexComputation) {
+            noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getNamedColumn().equals(scanNode.getInSchema().getColumn(i));
+          }
+          if (!noComplexComputation) {
+            return noComplexComputation;
+          }
+        }
+      }
+    }
+
+    return !checkIfDDLPlan(rootNode) &&
+        (simpleOperator && noComplexComputation  && isOneQueryBlock && noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
+  }
+
+  /**
+   * Checks whether the query has 'from clause' or not.
+   *
+   * @param plan The logical plan
+   * @return True if a query does not have 'from clause'.
+   */
+  public static boolean checkIfNonFromQuery(LogicalPlan plan) {
+    LogicalNode node = plan.getRootBlock().getRoot();
+
+    // one block, without where clause, no group-by, no-sort, no-join
+    boolean isOneQueryBlock = plan.getQueryBlocks().size() == 1;
+    boolean noRelation = !plan.getRootBlock().hasAlgebraicExpr(OpType.Relation);
+
+    return !checkIfDDLPlan(node) && noRelation && isOneQueryBlock;
+  }
+
+  /**
+   * Get all RelationNodes which are descendant of a given LogicalNode.
+   *
+   * @param from The LogicalNode to start visiting LogicalNodes.
+   * @return an array of all descendant RelationNode of LogicalNode.
+   */
+  public static String[] getRelationLineage(LogicalNode from) {
+    LogicalNode[] scans = findAllNodes(from, NodeType.SCAN, NodeType.PARTITIONS_SCAN);
+    String[] tableNames = new String[scans.length];
+    ScanNode scan;
+    for (int i = 0; i < scans.length; i++) {
+      scan = (ScanNode) scans[i];
+      tableNames[i] = scan.getCanonicalName();
+    }
+    return tableNames;
+  }
+
+  /**
+   * Get all RelationNodes which are descendant of a given LogicalNode.
+   * The finding is restricted within a query block.
+   *
+   * @param from The LogicalNode to start visiting LogicalNodes.
+   * @return an array of all descendant RelationNode of LogicalNode.
+   */
+  public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode from)
+      throws PlanningException {
+    RelationFinderVisitor visitor = new RelationFinderVisitor();
+    visitor.visit(null, plan, null, from, new Stack<LogicalNode>());
+    return visitor.getFoundRelations();
+  }
+
+  public static class RelationFinderVisitor extends BasicLogicalPlanVisitor<Object, LogicalNode> {
+    private Set<String> foundRelNameSet = Sets.newHashSet();
+
+    public Set<String> getFoundRelations() {
+      return foundRelNameSet;
+    }
+
+    @Override
+    public LogicalNode visit(Object context, LogicalPlan plan, @Nullable LogicalPlan.QueryBlock block, LogicalNode node,
+                             Stack<LogicalNode> stack) throws PlanningException {
+      if (node.getType() != NodeType.TABLE_SUBQUERY) {
+        super.visit(context, plan, block, node, stack);
+      }
+
+      if (node instanceof RelationNode) {
+        foundRelNameSet.add(((RelationNode) node).getCanonicalName());
+      }
+
+      return node;
+    }
+  }
+
+  /**
+   * Delete the logical node from a plan.
+   *
+   * @param parent      this node must be a parent node of one node to be removed.
+   * @param tobeRemoved this node must be a child node of the parent.
+   */
+  public static LogicalNode deleteNode(LogicalNode parent, LogicalNode tobeRemoved) {
+    Preconditions.checkArgument(tobeRemoved instanceof UnaryNode,
+        "ERROR: the logical node to be removed must be unary node.");
+
+    UnaryNode child = (UnaryNode) tobeRemoved;
+    LogicalNode grandChild = child.getChild();
+    if (parent instanceof UnaryNode) {
+      UnaryNode unaryParent = (UnaryNode) parent;
+
+      Preconditions.checkArgument(unaryParent.getChild() == child,
+          "ERROR: both logical node must be parent and child nodes");
+      unaryParent.setChild(grandChild);
+
+    } else if (parent instanceof BinaryNode) {
+      BinaryNode binaryParent = (BinaryNode) parent;
+      if (binaryParent.getLeftChild().deepEquals(child)) {
+        binaryParent.setLeftChild(grandChild);
+      } else if (binaryParent.getRightChild().deepEquals(child)) {
+        binaryParent.setRightChild(grandChild);
+      } else {
+        throw new IllegalStateException("ERROR: both logical node must be parent and child nodes");
+      }
+    } else {
+      throw new InvalidQueryException("Unexpected logical plan: " + parent);
+    }
+    return child;
+  }
+
+  public static void replaceNode(LogicalPlan plan, LogicalNode startNode, LogicalNode oldNode, LogicalNode newNode) {
+    LogicalNodeReplaceVisitor replacer = new LogicalNodeReplaceVisitor(oldNode, newNode);
+    try {
+      replacer.visit(new ReplacerContext(), plan, null, startNode, new Stack<LogicalNode>());
+    } catch (PlanningException e) {
+      e.printStackTrace();
+    }
+  }
+
+  static class ReplacerContext {
+    boolean updateSchemaFlag = false;
+  }
+
+  public static class LogicalNodeReplaceVisitor extends BasicLogicalPlanVisitor<ReplacerContext, LogicalNode> {
+    private LogicalNode target;
+    private LogicalNode tobeReplaced;
+
+    public LogicalNodeReplaceVisitor(LogicalNode target, LogicalNode tobeReplaced) {
+      this.target = target;
+      this.tobeReplaced = tobeReplaced;
+    }
+
+    /**
+     * If this node can have child, it returns TRUE. Otherwise, it returns FALSE.
+     */
+    private static boolean checkIfVisitable(LogicalNode node) {
+      return node instanceof UnaryNode || node instanceof BinaryNode;
+    }
+
+    @Override
+    public LogicalNode visit(ReplacerContext context, LogicalPlan plan, @Nullable LogicalPlan.QueryBlock block,
+                             LogicalNode node, Stack<LogicalNode> stack) throws PlanningException {
+      LogicalNode left = null;
+      LogicalNode right = null;
+
+      if (node instanceof UnaryNode) {
+        UnaryNode unaryNode = (UnaryNode) node;
+        if (unaryNode.getChild().deepEquals(target)) {
+          unaryNode.setChild(tobeReplaced);
+          left = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(unaryNode.getChild())) {
+          left = visit(context, plan, null, unaryNode.getChild(), stack);
+        }
+      } else if (node instanceof BinaryNode) {
+        BinaryNode binaryNode = (BinaryNode) node;
+        if (binaryNode.getLeftChild().deepEquals(target)) {
+          binaryNode.setLeftChild(tobeReplaced);
+          left = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(binaryNode.getLeftChild())) {
+          left = visit(context, plan, null, binaryNode.getLeftChild(), stack);
+        } else {
+          left = binaryNode.getLeftChild();
+        }
+
+        if (binaryNode.getRightChild().deepEquals(target)) {
+          binaryNode.setRightChild(tobeReplaced);
+          right = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(binaryNode.getRightChild())) {
+          right = visit(context, plan, null, binaryNode.getRightChild(), stack);
+        } else {
+          right = binaryNode.getRightChild();
+        }
+      }
+
+      // update schemas of nodes except for leaf node (i.e., RelationNode)
+      if (context.updateSchemaFlag) {
+        if (node instanceof Projectable) {
+          if (node instanceof BinaryNode) {
+            node.setInSchema(SchemaUtil.merge(left.getOutSchema(), right.getOutSchema()));
+          } else {
+            node.setInSchema(left.getOutSchema());
+          }
+          context.updateSchemaFlag = false;
+        } else {
+          node.setInSchema(left.getOutSchema());
+          node.setOutSchema(left.getOutSchema());
+        }
+      }
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                                 Stack<LogicalNode> stack) throws PlanningException {
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitPartitionedTableScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.
+        QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack)
+
+        throws PlanningException {
+      return node;
+    }
+  }
+
+  public static void replaceNode(LogicalNode plan, LogicalNode newNode, NodeType type) {
+    LogicalNode parent = findTopParentNode(plan, type);
+    Preconditions.checkArgument(parent instanceof UnaryNode);
+    Preconditions.checkArgument(!(newNode instanceof BinaryNode));
+    UnaryNode parentNode = (UnaryNode) parent;
+    LogicalNode child = parentNode.getChild();
+    if (child instanceof UnaryNode) {
+      ((UnaryNode) newNode).setChild(((UnaryNode) child).getChild());
+    }
+    parentNode.setChild(newNode);
+  }
+
+  /**
+   * Find the top logical node matched to type from the given node
+   *
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static <T extends LogicalNode> T findTopNode(LogicalNode node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.preOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(0);
+  }
+
+  /**
+   * Find the most bottom logical node matched to type from the given node
+   *
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.preOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(finder.getFoundNodes().size() - 1);
+  }
+
+  /**
+   * Find the all logical node matched to type from the given node
+   *
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static LogicalNode[] findAllNodes(LogicalNode node, NodeType... type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.postOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return new LogicalNode[]{};
+    }
+    List<LogicalNode> founds = finder.getFoundNodes();
+    return founds.toArray(new LogicalNode[founds.size()]);
+  }
+
+  /**
+   * Find a parent node of a given-typed operator.
+   *
+   * @param node start node
+   * @param type to find
+   * @return the parent node of a found logical node
+   */
+  public static <T extends LogicalNode> T findTopParentNode(LogicalNode node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    ParentNodeFinder finder = new ParentNodeFinder(type);
+    node.postOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(0);
+  }
+
+  private static class LogicalNodeFinder implements LogicalNodeVisitor {
+    private List<LogicalNode> list = new ArrayList<LogicalNode>();
+    private final NodeType[] tofind;
+    private boolean topmost = false;
+    private boolean finished = false;
+
+    public LogicalNodeFinder(NodeType... type) {
+      this.tofind = type;
+    }
+
+    public LogicalNodeFinder(NodeType[] type, boolean topmost) {
+      this(type);
+      this.topmost = topmost;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      if (!finished) {
+        for (NodeType type : tofind) {
+          if (node.getType() == type) {
+            list.add(node);
+          }
+          if (topmost && list.size() > 0) {
+            finished = true;
+          }
+        }
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return list;
+    }
+
+    public LogicalNode[] getFoundNodeArray() {
+      return list.toArray(new LogicalNode[list.size()]);
+    }
+  }
+
+  private static class ParentNodeFinder implements LogicalNodeVisitor {
+    private List<LogicalNode> list = new ArrayList<LogicalNode>();
+    private NodeType tofind;
+
+    public ParentNodeFinder(NodeType type) {
+      this.tofind = type;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        if (unary.getChild().getType() == tofind) {
+          list.add(node);
+        }
+      } else if (node instanceof BinaryNode) {
+        BinaryNode bin = (BinaryNode) node;
+        if (bin.getLeftChild().getType() == tofind ||
+            bin.getRightChild().getType() == tofind) {
+          list.add(node);
+        }
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return list;
+    }
+  }
+
+  /**
+   * fill targets with FieldEvals from a given schema
+   *
+   * @param schema  to be transformed to targets
+   * @param targets to be filled
+   */
+  public static void schemaToTargets(Schema schema, Target[] targets) {
+    FieldEval eval;
+    for (int i = 0; i < schema.size(); i++) {
+      eval = new FieldEval(schema.getColumn(i));
+      targets[i] = new Target(eval);
+    }
+  }
+
+  public static Target[] schemaToTargets(Schema schema) {
+    Target[] targets = new Target[schema.size()];
+
+    FieldEval eval;
+    for (int i = 0; i < schema.size(); i++) {
+      eval = new FieldEval(schema.getColumn(i));
+      targets[i] = new Target(eval);
+    }
+    return targets;
+  }
+
+  public static Target[] schemaToTargetsWithGeneratedFields(Schema schema) {
+    List<Target> targets = TUtil.newList();
+
+    FieldEval eval;
+    for (int i = 0; i < schema.size(); i++) {
+      eval = new FieldEval(schema.getColumn(i));
+      targets.add(new Target(eval));
+    }
+    return targets.toArray(new Target[targets.size()]);
+  }
+
+  public static SortSpec[] schemaToSortSpecs(Schema schema) {
+    return schemaToSortSpecs(schema.toArray());
+  }
+
+  public static SortSpec[] schemaToSortSpecs(Column[] columns) {
+    SortSpec[] specs = new SortSpec[columns.length];
+
+    for (int i = 0; i < columns.length; i++) {
+      specs[i] = new SortSpec(columns[i], true, false);
+    }
+
+    return specs;
+  }
+
+  public static SortSpec[] columnsToSortSpec(Collection<Column> columns) {
+    SortSpec[] specs = new SortSpec[columns.size()];
+    int i = 0;
+    for (Column column : columns) {
+      specs[i++] = new SortSpec(column, true, false);
+    }
+
+    return specs;
+  }
+
+  public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
+    Schema schema = new Schema();
+    for (SortSpec spec : sortSpecs) {
+      schema.addColumn(spec.getSortKey());
+    }
+
+    return schema;
+  }
+
+  public static SortSpec[][] getSortKeysFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
+    // It is used for the merge join executor. The merge join only considers the equi-join.
+    // So, theta-join flag must be false.
+    List<Column[]> joinKeyPairs = getJoinKeyPairs(joinQual, outer, inner, false);
+    SortSpec[] outerSortSpec = new SortSpec[joinKeyPairs.size()];
+    SortSpec[] innerSortSpec = new SortSpec[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      outerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[0]);
+      innerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[1]);
+    }
+
+    return new SortSpec[][]{outerSortSpec, innerSortSpec};
+  }
+
+  public static TupleComparator[] getComparatorsFromJoinQual(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+    SortSpec[][] sortSpecs = getSortKeysFromJoinQual(joinQual, leftSchema, rightSchema);
+    TupleComparator[] comparators = new TupleComparator[2];
+    comparators[0] = new TupleComparator(leftSchema, sortSpecs[0]);
+    comparators[1] = new TupleComparator(rightSchema, sortSpecs[1]);
+    return comparators;
+  }
+
+  /**
+   * @return the first array contains left table's columns, and the second array contains right table's columns.
+   */
+  public static Column[][] joinJoinKeyForEachTable(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
+    List<Column[]> joinKeys = getJoinKeyPairs(joinQual, leftSchema, rightSchema, true);
+    Column[] leftColumns = new Column[joinKeys.size()];
+    Column[] rightColumns = new Column[joinKeys.size()];
+    for (int i = 0; i < joinKeys.size(); i++) {
+      leftColumns[i] = joinKeys.get(i)[0];
+      rightColumns[i] = joinKeys.get(i)[1];
+    }
+
+    return new Column[][]{leftColumns, rightColumns};
+  }
+
+  public static List<Column[]> getJoinKeyPairs(EvalNode joinQual, Schema leftSchema, Schema rightSchema,
+                                               boolean includeThetaJoin) {
+    JoinKeyPairFinder finder = new JoinKeyPairFinder(includeThetaJoin, leftSchema, rightSchema);
+    joinQual.preOrder(finder);
+    return finder.getPairs();
+  }
+
+  public static class JoinKeyPairFinder implements EvalNodeVisitor {
+    private boolean includeThetaJoin;
+    private final List<Column[]> pairs = Lists.newArrayList();
+    private Schema[] schemas = new Schema[2];
+
+    public JoinKeyPairFinder(boolean includeThetaJoin, Schema outer, Schema inner) {
+      this.includeThetaJoin = includeThetaJoin;
+      schemas[0] = outer;
+      schemas[1] = inner;
+    }
+
+    @Override
+    public void visit(EvalNode node) {
+      if (EvalTreeUtil.isJoinQual(node, includeThetaJoin)) {
+        Column[] pair = new Column[2];
+
+        for (int i = 0; i <= 1; i++) { // access left, right sub expression
+          Column column = EvalTreeUtil.findAllColumnRefs(node.getExpr(i)).get(0);
+          for (int j = 0; j < schemas.length; j++) {
+            // check whether the column is for either outer or inner
+            // 0 is outer, and 1 is inner
+            if (schemas[j].containsByQualifiedName(column.getQualifiedName())) {
+              pair[j] = column;
+            }
+          }
+        }
+
+        if (pair[0] == null || pair[1] == null) {
+          throw new IllegalStateException("Wrong join key: " + node);
+        }
+        pairs.add(pair);
+      }
+    }
+
+    public List<Column[]> getPairs() {
+      return this.pairs;
+    }
+  }
+
+  public static Schema targetToSchema(Collection<Target> targets) {
+    return targetToSchema(targets.toArray(new Target[targets.size()]));
+  }
+
+  public static Schema targetToSchema(Target[] targets) {
+    Schema schema = new Schema();
+    for (Target t : targets) {
+      DataType type = t.getEvalTree().getValueType();
+      String name;
+      if (t.hasAlias()) {
+        name = t.getAlias();
+      } else {
+        name = t.getEvalTree().getName();
+      }
+      if (!schema.containsByQualifiedName(name)) {
+        schema.addColumn(name, type);
+      }
+    }
+
+    return schema;
+  }
+
+  /**
+   * It removes all table names from FieldEvals in targets
+   *
+   * @param sourceTargets The targets to be stripped
+   * @return The stripped targets
+   */
+  public static Target[] stripTarget(Target[] sourceTargets) {
+    Target[] copy = new Target[sourceTargets.length];
+    for (int i = 0; i < sourceTargets.length; i++) {
+      try {
+        copy[i] = (Target) sourceTargets[i].clone();
+      } catch (CloneNotSupportedException e) {
+        throw new InternalError(e.getMessage());
+      }
+      if (copy[i].getEvalTree().getType() == EvalType.FIELD) {
+        FieldEval fieldEval = copy[i].getEvalTree();
+        if (fieldEval.getColumnRef().hasQualifier()) {
+          fieldEval.replaceColumnRef(fieldEval.getColumnName());
+        }
+      }
+    }
+
+    return copy;
+  }
+
+  public static <T extends LogicalNode> T clone(LogicalPlan plan, LogicalNode node) {
+    try {
+      T copy = (T) node.clone();
+      if (plan == null) {
+        copy.setPID(-1);
+      } else {
+        copy.setPID(plan.newPID());
+      }
+      return copy;
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static boolean isCommutativeJoin(JoinType joinType) {
+    return joinType == JoinType.INNER;
+  }
+
+  public static boolean existsAggregationFunction(Expr expr) throws PlanningException {
+    AggregationFunctionFinder finder = new AggregationFunctionFinder();
+    AggFunctionFoundResult result = new AggFunctionFoundResult();
+    finder.visit(result, new Stack<Expr>(), expr);
+    return result.generalSetFunction;
+  }
+
+  public static boolean existsDistinctAggregationFunction(Expr expr) throws PlanningException {
+    AggregationFunctionFinder finder = new AggregationFunctionFinder();
+    AggFunctionFoundResult result = new AggFunctionFoundResult();
+    finder.visit(result, new Stack<Expr>(), expr);
+    return result.distinctSetFunction;
+  }
+
+  static class AggFunctionFoundResult {
+    boolean generalSetFunction;
+    boolean distinctSetFunction;
+  }
+
+  static class AggregationFunctionFinder extends SimpleAlgebraVisitor<AggFunctionFoundResult, Object> {
+    @Override
+    public Object visitCountRowsFunction(AggFunctionFoundResult ctx, Stack<Expr> stack, CountRowsFunctionExpr expr)
+        throws PlanningException {
+      ctx.generalSetFunction = true;
+      return super.visitCountRowsFunction(ctx, stack, expr);
+    }
+
+    @Override
+    public Object visitGeneralSetFunction(AggFunctionFoundResult ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
+        throws PlanningException {
+      ctx.generalSetFunction = true;
+      ctx.distinctSetFunction = expr.isDistinct();
+      return super.visitGeneralSetFunction(ctx, stack, expr);
+    }
+  }
+
+  public static Collection<String> toQualifiedFieldNames(Collection<String> fieldNames, String qualifier) {
+    List<String> names = TUtil.newList();
+    for (String n : fieldNames) {
+      String[] parts = n.split("\\.");
+      if (parts.length == 1) {
+        names.add(qualifier + "." + parts[0]);
+      } else {
+        names.add(qualifier + "." + parts[1]);
+      }
+    }
+    return names;
+  }
+
+  public static SortSpec[] convertSortSpecs(Collection<CatalogProtos.SortSpecProto> sortSpecProtos) {
+    SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()];
+    int i = 0;
+    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
+      sortSpecs[i++] = new SortSpec(proto);
+    }
+    return sortSpecs;
+  }
+
+  /**
+   * Generate an explain string of a LogicalNode and its descendant nodes.
+   *
+   * @param node The LogicalNode instance to be started
+   * @return A pretty print explain string
+   */
+  public static String buildExplainString(LogicalNode node) {
+    ExplainLogicalPlanVisitor explain = new ExplainLogicalPlanVisitor();
+
+    StringBuilder explains = new StringBuilder();
+    try {
+      ExplainLogicalPlanVisitor.Context explainContext = explain.getBlockPlanStrings(null, node);
+      while (!explainContext.explains.empty()) {
+        explains.append(
+            ExplainLogicalPlanVisitor.printDepthString(explainContext.getMaxDepth(), explainContext.explains.pop()));
+      }
+    } catch (PlanningException e) {
+      throw new RuntimeException(e);
+    }
+
+    return explains.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanningException.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanningException.java
new file mode 100644
index 0000000..4fa88ee
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlanningException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+public class PlanningException extends Exception {
+  public PlanningException(String message) {
+    super(message);
+  }
+
+  public PlanningException(Exception e) {
+    super(e);
+  }
+}


Mime
View raw message