tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [02/28] TAJO-1125: Separate logical plan and optimizer into a maven module.
Date Sat, 25 Oct 2014 18:17:51 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
new file mode 100644
index 0000000..a7175a1
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -0,0 +1,778 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.plan.*;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.plan.visitor.ExplainLogicalPlanVisitor;
+import org.apache.tajo.plan.visitor.SimpleAlgebraVisitor;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+public class PlannerUtil {
+
+  public static boolean checkIfDDLPlan(LogicalNode node) {
+    LogicalNode baseNode = node;
+    if (node instanceof LogicalRootNode) {
+      baseNode = ((LogicalRootNode) node).getChild();
+    }
+
+    NodeType type = baseNode.getType();
+
+    return
+        type == NodeType.CREATE_DATABASE ||
+            type == NodeType.DROP_DATABASE ||
+            (type == NodeType.CREATE_TABLE && !((CreateTableNode) baseNode).hasSubQuery()) ||
+            baseNode.getType() == NodeType.DROP_TABLE ||
+            baseNode.getType() == NodeType.ALTER_TABLESPACE ||
+            baseNode.getType() == NodeType.ALTER_TABLE ||
+            baseNode.getType() == NodeType.TRUNCATE_TABLE;
+  }
+
+  /**
+   * Checks whether the query is simple or not.
+   * The simple query can be defined as 'select * from tb_name [LIMIT X]'.
+   *
+   * @param plan The logical plan
+   * @return True if the query is a simple query.
+   */
+  public static boolean checkIfSimpleQuery(LogicalPlan plan) {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+    // one block, without where clause, no group-by, no-sort, no-join
+    boolean isOneQueryBlock = plan.getQueryBlocks().size() == 1;
+    boolean simpleOperator = rootNode.getChild().getType() == NodeType.LIMIT
+        || rootNode.getChild().getType() == NodeType.SCAN || rootNode.getChild().getType() == NodeType.PARTITIONS_SCAN;
+    boolean noOrderBy = !plan.getRootBlock().hasNode(NodeType.SORT);
+    boolean noGroupBy = !plan.getRootBlock().hasNode(NodeType.GROUP_BY);
+    boolean noWhere = !plan.getRootBlock().hasNode(NodeType.SELECTION);
+    boolean noJoin = !plan.getRootBlock().hasNode(NodeType.JOIN);
+    boolean singleRelation =
+        (plan.getRootBlock().hasNode(NodeType.SCAN) || plan.getRootBlock().hasNode(NodeType.PARTITIONS_SCAN)) &&
+        PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1;
+
+    boolean noComplexComputation = false;
+    if (singleRelation) {
+      ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+      if (scanNode == null) {
+        scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
+      }
+      if (scanNode.hasTargets()) {
+        // If the number of columns in the select clause is s different from table schema,
+        // This query is not a simple query.
+        if (scanNode.getTableDesc().hasPartition()) {
+          // In the case of partitioned table, the actual number of columns is ScanNode.InSchema + partitioned columns
+          int numPartitionColumns = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema().size();
+          if (scanNode.getTargets().length != scanNode.getInSchema().size() + numPartitionColumns) {
+            return false;
+          }
+        } else {
+          if (scanNode.getTargets().length != scanNode.getInSchema().size()) {
+            return false;
+          }
+        }
+        noComplexComputation = true;
+        for (int i = 0; i < scanNode.getTargets().length; i++) {
+          noComplexComputation =
+              noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD;
+          if (noComplexComputation) {
+            noComplexComputation = noComplexComputation &&
+                scanNode.getTargets()[i].getNamedColumn().equals(
+                    scanNode.getTableDesc().getLogicalSchema().getColumn(i));
+          }
+          if (!noComplexComputation) {
+            return noComplexComputation;
+          }
+        }
+      }
+    }
+
+    return !checkIfDDLPlan(rootNode) &&
+        (simpleOperator && noComplexComputation && isOneQueryBlock &&
+            noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
+  }
+
+  /**
+   * Checks whether the query has 'from clause' or not.
+   *
+   * @param plan The logical plan
+   * @return True if a query does not have 'from clause'.
+   */
+  public static boolean checkIfNonFromQuery(LogicalPlan plan) {
+    LogicalNode node = plan.getRootBlock().getRoot();
+
+    // one block, without where clause, no group-by, no-sort, no-join
+    boolean isOneQueryBlock = plan.getQueryBlocks().size() == 1;
+    boolean noRelation = !plan.getRootBlock().hasAlgebraicExpr(OpType.Relation);
+
+    return !checkIfDDLPlan(node) && noRelation && isOneQueryBlock;
+  }
+
+  /**
+   * Get all RelationNodes which are descendant of a given LogicalNode.
+   *
+   * @param from The LogicalNode to start visiting LogicalNodes.
+   * @return an array of all descendant RelationNode of LogicalNode.
+   */
+  public static String[] getRelationLineage(LogicalNode from) {
+    LogicalNode[] scans = findAllNodes(from, NodeType.SCAN, NodeType.PARTITIONS_SCAN);
+    String[] tableNames = new String[scans.length];
+    ScanNode scan;
+    for (int i = 0; i < scans.length; i++) {
+      scan = (ScanNode) scans[i];
+      tableNames[i] = scan.getCanonicalName();
+    }
+    return tableNames;
+  }
+
+  /**
+   * Get all RelationNodes which are descendant of a given LogicalNode.
+   * The finding is restricted within a query block.
+   *
+   * @param from The LogicalNode to start visiting LogicalNodes.
+   * @return an array of all descendant RelationNode of LogicalNode.
+   */
+  public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode from)
+      throws PlanningException {
+    RelationFinderVisitor visitor = new RelationFinderVisitor();
+    visitor.visit(null, plan, null, from, new Stack<LogicalNode>());
+    return visitor.getFoundRelations();
+  }
+
+  public static class RelationFinderVisitor extends BasicLogicalPlanVisitor<Object, LogicalNode> {
+    private Set<String> foundRelNameSet = Sets.newHashSet();
+
+    public Set<String> getFoundRelations() {
+      return foundRelNameSet;
+    }
+
+    @Override
+    public LogicalNode visit(Object context, LogicalPlan plan, @Nullable LogicalPlan.QueryBlock block, LogicalNode node,
+                             Stack<LogicalNode> stack) throws PlanningException {
+      if (node.getType() != NodeType.TABLE_SUBQUERY) {
+        super.visit(context, plan, block, node, stack);
+      }
+
+      if (node instanceof RelationNode) {
+        foundRelNameSet.add(((RelationNode) node).getCanonicalName());
+      }
+
+      return node;
+    }
+  }
+
+  /**
+   * Delete the logical node from a plan.
+   *
+   * @param parent      this node must be a parent node of one node to be removed.
+   * @param tobeRemoved this node must be a child node of the parent.
+   */
+  public static LogicalNode deleteNode(LogicalNode parent, LogicalNode tobeRemoved) {
+    Preconditions.checkArgument(tobeRemoved instanceof UnaryNode,
+        "ERROR: the logical node to be removed must be unary node.");
+
+    UnaryNode child = (UnaryNode) tobeRemoved;
+    LogicalNode grandChild = child.getChild();
+    if (parent instanceof UnaryNode) {
+      UnaryNode unaryParent = (UnaryNode) parent;
+
+      Preconditions.checkArgument(unaryParent.getChild() == child,
+          "ERROR: both logical node must be parent and child nodes");
+      unaryParent.setChild(grandChild);
+
+    } else if (parent instanceof BinaryNode) {
+      BinaryNode binaryParent = (BinaryNode) parent;
+      if (binaryParent.getLeftChild().deepEquals(child)) {
+        binaryParent.setLeftChild(grandChild);
+      } else if (binaryParent.getRightChild().deepEquals(child)) {
+        binaryParent.setRightChild(grandChild);
+      } else {
+        throw new IllegalStateException("ERROR: both logical node must be parent and child nodes");
+      }
+    } else {
+      throw new InvalidQueryException("Unexpected logical plan: " + parent);
+    }
+    return child;
+  }
+
+  public static void replaceNode(LogicalPlan plan, LogicalNode startNode, LogicalNode oldNode, LogicalNode newNode) {
+    LogicalNodeReplaceVisitor replacer = new LogicalNodeReplaceVisitor(oldNode, newNode);
+    try {
+      replacer.visit(new ReplacerContext(), plan, null, startNode, new Stack<LogicalNode>());
+    } catch (PlanningException e) {
+      e.printStackTrace();
+    }
+  }
+
+  static class ReplacerContext {
+    boolean updateSchemaFlag = false;
+  }
+
+  public static class LogicalNodeReplaceVisitor extends BasicLogicalPlanVisitor<ReplacerContext, LogicalNode> {
+    private LogicalNode target;
+    private LogicalNode tobeReplaced;
+
+    public LogicalNodeReplaceVisitor(LogicalNode target, LogicalNode tobeReplaced) {
+      this.target = target;
+      this.tobeReplaced = tobeReplaced;
+    }
+
+    /**
+     * If this node can have child, it returns TRUE. Otherwise, it returns FALSE.
+     */
+    private static boolean checkIfVisitable(LogicalNode node) {
+      return node instanceof UnaryNode || node instanceof BinaryNode;
+    }
+
+    @Override
+    public LogicalNode visit(ReplacerContext context, LogicalPlan plan, @Nullable LogicalPlan.QueryBlock block,
+                             LogicalNode node, Stack<LogicalNode> stack) throws PlanningException {
+      LogicalNode left = null;
+      LogicalNode right = null;
+
+      if (node instanceof UnaryNode) {
+        UnaryNode unaryNode = (UnaryNode) node;
+        if (unaryNode.getChild().deepEquals(target)) {
+          unaryNode.setChild(tobeReplaced);
+          left = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(unaryNode.getChild())) {
+          left = visit(context, plan, null, unaryNode.getChild(), stack);
+        }
+      } else if (node instanceof BinaryNode) {
+        BinaryNode binaryNode = (BinaryNode) node;
+        if (binaryNode.getLeftChild().deepEquals(target)) {
+          binaryNode.setLeftChild(tobeReplaced);
+          left = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(binaryNode.getLeftChild())) {
+          left = visit(context, plan, null, binaryNode.getLeftChild(), stack);
+        } else {
+          left = binaryNode.getLeftChild();
+        }
+
+        if (binaryNode.getRightChild().deepEquals(target)) {
+          binaryNode.setRightChild(tobeReplaced);
+          right = tobeReplaced;
+          context.updateSchemaFlag = true;
+        } else if (checkIfVisitable(binaryNode.getRightChild())) {
+          right = visit(context, plan, null, binaryNode.getRightChild(), stack);
+        } else {
+          right = binaryNode.getRightChild();
+        }
+      }
+
+      // update schemas of nodes except for leaf node (i.e., RelationNode)
+      if (context.updateSchemaFlag) {
+        if (node instanceof Projectable) {
+          if (node instanceof BinaryNode) {
+            node.setInSchema(SchemaUtil.merge(left.getOutSchema(), right.getOutSchema()));
+          } else {
+            node.setInSchema(left.getOutSchema());
+          }
+          context.updateSchemaFlag = false;
+        } else {
+          node.setInSchema(left.getOutSchema());
+          node.setOutSchema(left.getOutSchema());
+        }
+      }
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                                 Stack<LogicalNode> stack) throws PlanningException {
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitPartitionedTableScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.
+        QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack)
+
+        throws PlanningException {
+      return node;
+    }
+  }
+
+  public static void replaceNode(LogicalNode plan, LogicalNode newNode, NodeType type) {
+    LogicalNode parent = findTopParentNode(plan, type);
+    Preconditions.checkArgument(parent instanceof UnaryNode);
+    Preconditions.checkArgument(!(newNode instanceof BinaryNode));
+    UnaryNode parentNode = (UnaryNode) parent;
+    LogicalNode child = parentNode.getChild();
+    if (child instanceof UnaryNode) {
+      ((UnaryNode) newNode).setChild(((UnaryNode) child).getChild());
+    }
+    parentNode.setChild(newNode);
+  }
+
+  /**
+   * Find the top logical node matched to type from the given node
+   *
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static <T extends LogicalNode> T findTopNode(LogicalNode node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.preOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(0);
+  }
+
+  /**
+   * Find the most bottom logical node matched to type from the given node
+   *
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static <T extends LogicalNode> T findMostBottomNode(LogicalNode node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.preOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(finder.getFoundNodes().size() - 1);
+  }
+
+  /**
+   * Find the all logical node matched to type from the given node
+   *
+   * @param node start node
+   * @param type to find
+   * @return a found logical node
+   */
+  public static LogicalNode[] findAllNodes(LogicalNode node, NodeType... type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    LogicalNodeFinder finder = new LogicalNodeFinder(type);
+    node.postOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return new LogicalNode[]{};
+    }
+    List<LogicalNode> founds = finder.getFoundNodes();
+    return founds.toArray(new LogicalNode[founds.size()]);
+  }
+
+  /**
+   * Find a parent node of a given-typed operator.
+   *
+   * @param node start node
+   * @param type to find
+   * @return the parent node of a found logical node
+   */
+  public static <T extends LogicalNode> T findTopParentNode(LogicalNode node, NodeType type) {
+    Preconditions.checkNotNull(node);
+    Preconditions.checkNotNull(type);
+
+    ParentNodeFinder finder = new ParentNodeFinder(type);
+    node.postOrder(finder);
+
+    if (finder.getFoundNodes().size() == 0) {
+      return null;
+    }
+    return (T) finder.getFoundNodes().get(0);
+  }
+
+  private static class LogicalNodeFinder implements LogicalNodeVisitor {
+    private List<LogicalNode> list = new ArrayList<LogicalNode>();
+    private final NodeType[] tofind;
+    private boolean topmost = false;
+    private boolean finished = false;
+
+    public LogicalNodeFinder(NodeType... type) {
+      this.tofind = type;
+    }
+
+    public LogicalNodeFinder(NodeType[] type, boolean topmost) {
+      this(type);
+      this.topmost = topmost;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      if (!finished) {
+        for (NodeType type : tofind) {
+          if (node.getType() == type) {
+            list.add(node);
+          }
+          if (topmost && list.size() > 0) {
+            finished = true;
+          }
+        }
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return list;
+    }
+
+    public LogicalNode[] getFoundNodeArray() {
+      return list.toArray(new LogicalNode[list.size()]);
+    }
+  }
+
+  private static class ParentNodeFinder implements LogicalNodeVisitor {
+    private List<LogicalNode> list = new ArrayList<LogicalNode>();
+    private NodeType tofind;
+
+    public ParentNodeFinder(NodeType type) {
+      this.tofind = type;
+    }
+
+    @Override
+    public void visit(LogicalNode node) {
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        if (unary.getChild().getType() == tofind) {
+          list.add(node);
+        }
+      } else if (node instanceof BinaryNode) {
+        BinaryNode bin = (BinaryNode) node;
+        if (bin.getLeftChild().getType() == tofind ||
+            bin.getRightChild().getType() == tofind) {
+          list.add(node);
+        }
+      }
+    }
+
+    public List<LogicalNode> getFoundNodes() {
+      return list;
+    }
+  }
+
+  /**
+   * fill targets with FieldEvals from a given schema
+   *
+   * @param schema  to be transformed to targets
+   * @param targets to be filled
+   */
+  public static void schemaToTargets(Schema schema, Target[] targets) {
+    FieldEval eval;
+    for (int i = 0; i < schema.size(); i++) {
+      eval = new FieldEval(schema.getColumn(i));
+      targets[i] = new Target(eval);
+    }
+  }
+
+  public static Target[] schemaToTargets(Schema schema) {
+    Target[] targets = new Target[schema.size()];
+
+    FieldEval eval;
+    for (int i = 0; i < schema.size(); i++) {
+      eval = new FieldEval(schema.getColumn(i));
+      targets[i] = new Target(eval);
+    }
+    return targets;
+  }
+
+  public static Target[] schemaToTargetsWithGeneratedFields(Schema schema) {
+    List<Target> targets = TUtil.newList();
+
+    FieldEval eval;
+    for (int i = 0; i < schema.size(); i++) {
+      eval = new FieldEval(schema.getColumn(i));
+      targets.add(new Target(eval));
+    }
+    return targets.toArray(new Target[targets.size()]);
+  }
+
+  public static SortSpec[] schemaToSortSpecs(Schema schema) {
+    return columnsToSortSpecs(schema.toArray());
+  }
+
+  public static SortSpec[] columnsToSortSpecs(Column[] columns) {
+    SortSpec[] specs = new SortSpec[columns.length];
+
+    for (int i = 0; i < columns.length; i++) {
+      specs[i] = new SortSpec(columns[i], true, false);
+    }
+
+    return specs;
+  }
+
+  public static SortSpec[] columnsToSortSpecs(Collection<Column> columns) {
+    return columnsToSortSpecs(columns.toArray(new Column[columns.size()]));
+  }
+
+  public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) {
+    Schema schema = new Schema();
+    for (SortSpec spec : sortSpecs) {
+      schema.addColumn(spec.getSortKey());
+    }
+
+    return schema;
+  }
+
+  public static SortSpec[][] getSortKeysFromJoinQual(EvalNode joinQual, Schema outer, Schema inner) {
+    // It is used for the merge join executor. The merge join only considers the equi-join.
+    // So, theta-join flag must be false.
+    List<Column[]> joinKeyPairs = getJoinKeyPairs(joinQual, outer, inner, false);
+    SortSpec[] outerSortSpec = new SortSpec[joinKeyPairs.size()];
+    SortSpec[] innerSortSpec = new SortSpec[joinKeyPairs.size()];
+
+    for (int i = 0; i < joinKeyPairs.size(); i++) {
+      outerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[0]);
+      innerSortSpec[i] = new SortSpec(joinKeyPairs.get(i)[1]);
+    }
+
+    return new SortSpec[][]{outerSortSpec, innerSortSpec};
+  }
+
+  /**
+   * @return the first array contains left table's columns, and the second array contains right table's columns.
+   */
+  public static Column[][] joinJoinKeyForEachTable(EvalNode joinQual, Schema leftSchema,
+                                                   Schema rightSchema, boolean includeThetaJoin) {
+    List<Column[]> joinKeys = getJoinKeyPairs(joinQual, leftSchema, rightSchema, includeThetaJoin);
+    Column[] leftColumns = new Column[joinKeys.size()];
+    Column[] rightColumns = new Column[joinKeys.size()];
+    for (int i = 0; i < joinKeys.size(); i++) {
+      leftColumns[i] = joinKeys.get(i)[0];
+      rightColumns[i] = joinKeys.get(i)[1];
+    }
+
+    return new Column[][]{leftColumns, rightColumns};
+  }
+
+  public static List<Column[]> getJoinKeyPairs(EvalNode joinQual, Schema leftSchema, Schema rightSchema,
+                                               boolean includeThetaJoin) {
+    JoinKeyPairFinder finder = new JoinKeyPairFinder(includeThetaJoin, leftSchema, rightSchema);
+    joinQual.preOrder(finder);
+    return finder.getPairs();
+  }
+
+  public static class JoinKeyPairFinder implements EvalNodeVisitor {
+    private boolean includeThetaJoin;
+    private final List<Column[]> pairs = Lists.newArrayList();
+    private Schema[] schemas = new Schema[2];
+
+    public JoinKeyPairFinder(boolean includeThetaJoin, Schema outer, Schema inner) {
+      this.includeThetaJoin = includeThetaJoin;
+      schemas[0] = outer;
+      schemas[1] = inner;
+    }
+
+    @Override
+    public void visit(EvalNode node) {
+      if (EvalTreeUtil.isJoinQual(node, includeThetaJoin)) {
+        BinaryEval binaryEval = (BinaryEval) node;
+        Column[] pair = new Column[2];
+
+        for (int i = 0; i <= 1; i++) { // access left, right sub expression
+          Column column = EvalTreeUtil.findAllColumnRefs(binaryEval.getChild(i)).get(0);
+          for (int j = 0; j < schemas.length; j++) {
+            // check whether the column is for either outer or inner
+            // 0 is outer, and 1 is inner
+            if (schemas[j].contains(column.getQualifiedName())) {
+              pair[j] = column;
+            }
+          }
+        }
+
+        if (pair[0] == null || pair[1] == null) {
+          throw new IllegalStateException("Wrong join key: " + node);
+        }
+        pairs.add(pair);
+      }
+    }
+
+    public List<Column[]> getPairs() {
+      return this.pairs;
+    }
+  }
+
+  public static Schema targetToSchema(Collection<Target> targets) {
+    return targetToSchema(targets.toArray(new Target[targets.size()]));
+  }
+
+  public static Schema targetToSchema(Target[] targets) {
+    Schema schema = new Schema();
+    for (Target t : targets) {
+      DataType type = t.getEvalTree().getValueType();
+      String name;
+      if (t.hasAlias()) {
+        name = t.getAlias();
+      } else {
+        name = t.getEvalTree().getName();
+      }
+      if (!schema.containsByQualifiedName(name)) {
+        schema.addColumn(name, type);
+      }
+    }
+
+    return schema;
+  }
+
+  /**
+   * It removes all table names from FieldEvals in targets
+   *
+   * @param sourceTargets The targets to be stripped
+   * @return The stripped targets
+   */
+  public static Target[] stripTarget(Target[] sourceTargets) {
+    Target[] copy = new Target[sourceTargets.length];
+    for (int i = 0; i < sourceTargets.length; i++) {
+      try {
+        copy[i] = (Target) sourceTargets[i].clone();
+      } catch (CloneNotSupportedException e) {
+        throw new InternalError(e.getMessage());
+      }
+      if (copy[i].getEvalTree().getType() == EvalType.FIELD) {
+        FieldEval fieldEval = copy[i].getEvalTree();
+        if (fieldEval.getColumnRef().hasQualifier()) {
+          fieldEval.replaceColumnRef(fieldEval.getColumnName());
+        }
+      }
+    }
+
+    return copy;
+  }
+
+  public static <T extends LogicalNode> T clone(LogicalPlan plan, LogicalNode node) {
+    try {
+      T copy = (T) node.clone();
+      if (plan == null) {
+        copy.setPID(-1);
+      } else {
+        copy.setPID(plan.newPID());
+        if (node instanceof DistinctGroupbyNode) {
+          DistinctGroupbyNode dNode = (DistinctGroupbyNode)copy;
+          for (GroupbyNode eachNode: dNode.getGroupByNodes()) {
+            eachNode.setPID(plan.newPID());
+          }
+        }
+      }
+      return copy;
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static boolean isCommutativeJoin(JoinType joinType) {
+    return joinType == JoinType.INNER;
+  }
+
+  public static boolean existsAggregationFunction(Expr expr) throws PlanningException {
+    AggregationFunctionFinder finder = new AggregationFunctionFinder();
+    AggFunctionFoundResult result = new AggFunctionFoundResult();
+    finder.visit(result, new Stack<Expr>(), expr);
+    return result.generalSetFunction;
+  }
+
+  public static boolean existsDistinctAggregationFunction(Expr expr) throws PlanningException {
+    AggregationFunctionFinder finder = new AggregationFunctionFinder();
+    AggFunctionFoundResult result = new AggFunctionFoundResult();
+    finder.visit(result, new Stack<Expr>(), expr);
+    return result.distinctSetFunction;
+  }
+
+  static class AggFunctionFoundResult {
+    boolean generalSetFunction;
+    boolean distinctSetFunction;
+  }
+
+  static class AggregationFunctionFinder extends SimpleAlgebraVisitor<AggFunctionFoundResult, Object> {
+    @Override
+    public Object visitCountRowsFunction(AggFunctionFoundResult ctx, Stack<Expr> stack, CountRowsFunctionExpr expr)
+        throws PlanningException {
+      ctx.generalSetFunction = true;
+      return super.visitCountRowsFunction(ctx, stack, expr);
+    }
+
+    @Override
+    public Object visitGeneralSetFunction(AggFunctionFoundResult ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
+        throws PlanningException {
+      ctx.generalSetFunction = true;
+      ctx.distinctSetFunction = expr.isDistinct();
+      return super.visitGeneralSetFunction(ctx, stack, expr);
+    }
+  }
+
+  public static Collection<String> toQualifiedFieldNames(Collection<String> fieldNames, String qualifier) {
+    List<String> names = TUtil.newList();
+    for (String n : fieldNames) {
+      String[] parts = n.split("\\.");
+      if (parts.length == 1) {
+        names.add(qualifier + "." + parts[0]);
+      } else {
+        names.add(qualifier + "." + parts[1]);
+      }
+    }
+    return names;
+  }
+
+  public static SortSpec[] convertSortSpecs(Collection<CatalogProtos.SortSpecProto> sortSpecProtos) {
+    SortSpec[] sortSpecs = new SortSpec[sortSpecProtos.size()];
+    int i = 0;
+    for (CatalogProtos.SortSpecProto proto : sortSpecProtos) {
+      sortSpecs[i++] = new SortSpec(proto);
+    }
+    return sortSpecs;
+  }
+
+  /**
+   * Generate an explain string of a LogicalNode and its descendant nodes.
+   *
+   * @param node The LogicalNode instance to be started
+   * @return A pretty print explain string
+   */
+  public static String buildExplainString(LogicalNode node) {
+    ExplainLogicalPlanVisitor explain = new ExplainLogicalPlanVisitor();
+
+    StringBuilder explains = new StringBuilder();
+    try {
+      ExplainLogicalPlanVisitor.Context explainContext = explain.getBlockPlanStrings(null, node);
+      while (!explainContext.explains.empty()) {
+        explains.append(
+            ExplainLogicalPlanVisitor.printDepthString(explainContext.getMaxDepth(), explainContext.explains.pop()));
+      }
+    } catch (PlanningException e) {
+      throw new RuntimeException(e);
+    }
+
+    return explains.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java
new file mode 100644
index 0000000..78135ab
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/SchemaUtil.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.util;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+
+public class SchemaUtil {
+  // See TAJO-914 bug.
+  //
+  // Its essential problem is that constant value is evaluated multiple times at each scan.
+  // As a result, join nodes can take the child nodes which have the same named fields.
+  // Because current schema does not allow the same name and ignore the duplicated schema,
+  // it finally causes the in-out schema mismatch between the parent and child nodes.
+  //
+  // tmpColumnSeq is a hack to avoid the above problem by keeping duplicated constant values as different name fields.
+  // The essential solution would be https://issues.apache.org/jira/browse/TAJO-895.
+  static int tmpColumnSeq = 0;
+  public static Schema merge(Schema left, Schema right) {
+    Schema merged = new Schema();
+    for(Column col : left.getColumns()) {
+      if (!merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn(col);
+      }
+    }
+    for(Column col : right.getColumns()) {
+      if (merged.containsByQualifiedName(col.getQualifiedName())) {
+        merged.addColumn("?fake" + (tmpColumnSeq++), col.getDataType());
+      } else {
+        merged.addColumn(col);
+      }
+    }
+
+    // if overflow
+    if (tmpColumnSeq < 0) {
+      tmpColumnSeq = 0;
+    }
+    return merged;
+  }
+
+  /**
+   * Get common columns to be used as join keys of natural joins.
+   */
+  public static Schema getNaturalJoinColumns(Schema left, Schema right) {
+    Schema common = new Schema();
+    for (Column outer : left.getColumns()) {
+      if (!common.containsByName(outer.getSimpleName()) && right.containsByName(outer.getSimpleName())) {
+        common.addColumn(new Column(outer.getSimpleName(), outer.getDataType()));
+      }
+    }
+    
+    return common;
+  }
+
+  public static Schema getQualifiedLogicalSchema(TableDesc tableDesc, String tableName) {
+    Schema logicalSchema = new Schema(tableDesc.getLogicalSchema());
+    if (tableName != null) {
+      logicalSchema.setQualifier(tableName);
+    }
+    return logicalSchema;
+  }
+
+  public static <T extends Schema> T clone(Schema schema) {
+    try {
+      T copy = (T) schema.clone();
+      return copy;
+    } catch (CloneNotSupportedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/ExprsVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/ExprsVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/ExprsVerifier.java
new file mode 100644
index 0000000..ed8a702
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/ExprsVerifier.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.verifier;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.LogicalNode;
+
+import java.util.Set;
+import java.util.Stack;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+import static org.apache.tajo.common.TajoDataTypes.Type;
+
+/**
+ * It verifies one predicate or expression with the semantic and data type checks as follows:
+ * <ul>
+ *   <ul>Both expressions in a binary expression are compatible to each other</ul>
+ *   <ul>All column references of one expression are avilable at this node</ul>
+ * </ul>
+ */
+public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalNode> {
+  private static final ExprsVerifier instance;
+
+  static {
+    instance = new ExprsVerifier();
+  }
+
+  public static VerificationState verify(VerificationState state, LogicalNode currentNode, EvalNode expression)
+      throws PlanningException {
+    instance.visitChild(state, expression, new Stack<EvalNode>());
+    Set<Column> referredColumns = EvalTreeUtil.findUniqueColumns(expression);
+    for (Column referredColumn : referredColumns) {
+      if (!currentNode.getInSchema().contains(referredColumn)) {
+        throw new PlanningException("Invalid State: " + referredColumn + " cannot be accessible at Node ("
+            + currentNode.getPID() + ")");
+      }
+    }
+    return state;
+  }
+
+  /**
+   * It checks the compatibility of two data types.
+   */
+  private static boolean isCompatibleType(DataType dataType1, DataType dataType2) {
+    if (checkNumericType(dataType1) && checkNumericType(dataType2)) {
+      return true;
+    }
+
+    if (checkTextData(dataType1) && checkTextData(dataType2)) {
+      return true;
+    }
+
+    if (checkDateTime(dataType1) && checkDateTime(dataType2)) {
+      return true;
+    }
+
+    if (checkNetworkType(dataType1) && checkNetworkType(dataType2)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * It checks both expressions in a comparison operator are compatible to each other.
+   */
+  private static void verifyComparisonOperator(VerificationState state, BinaryEval expr) {
+    DataType leftType = expr.getLeftExpr().getValueType();
+    DataType rightType = expr.getRightExpr().getValueType();
+    if (!isCompatibleType(leftType, rightType)) {
+      state.addVerification("No operator matches the given name and argument type(s): " + expr.toString());
+    }
+  }
+
+  public EvalNode visitEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  public EvalNode visitNotEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitNotEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitLessThan(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitLessThan(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitLessThanOrEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitLessThanOrEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitGreaterThan(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitGreaterThan(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitGreaterThanOrEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitGreaterThanOrEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  private static void checkDivisionByZero(VerificationState state, BinaryEval evalNode) {
+    if (evalNode.getRightExpr().getType() == EvalType.CONST) {
+      ConstEval constEval = evalNode.getRightExpr();
+      if (constEval.getValue().asFloat8() == 0) {
+        state.addVerification("division by zero");
+      }
+    }
+  }
+
+  private static void checkArithmeticOperand(VerificationState state, BinaryEval evalNode) {
+    EvalNode leftExpr = evalNode.getLeftExpr();
+    EvalNode rightExpr = evalNode.getRightExpr();
+
+    DataType leftDataType = leftExpr.getValueType();
+    DataType rightDataType = rightExpr.getValueType();
+
+    Type leftType = leftDataType.getType();
+    Type rightType = rightDataType.getType();
+
+    if (leftType == Type.DATE &&
+          (checkIntType(rightDataType) ||
+              rightType == Type.DATE || rightType == Type.INTERVAL || rightType == Type.TIME)) {
+      return;
+    }
+
+    if (leftType == Type.INTERVAL &&
+        (checkNumericType(rightDataType) ||
+            rightType == Type.DATE || rightType == Type.INTERVAL || rightType == Type.TIME ||
+            rightType == Type.TIMESTAMP)) {
+      return;
+    }
+
+    if (leftType == Type.TIME &&
+        (rightType == Type.DATE || rightType == Type.INTERVAL || rightType == Type.TIME)) {
+      return;
+    }
+
+    if (leftType == Type.TIMESTAMP &&
+        (rightType == Type.TIMESTAMP || rightType == Type.INTERVAL || rightType == Type.TIME)) {
+      return;
+    }
+
+    if (!(checkNumericType(leftDataType) && checkNumericType(rightDataType))) {
+      state.addVerification("No operator matches the given name and argument type(s): " + evalNode.toString());
+    }
+  }
+
+  private static boolean checkNetworkType(DataType dataType) {
+    return dataType.getType() == Type.INET4 || dataType.getType() == Type.INET6;
+  }
+
+  private static boolean checkIntType(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return Type.INT1.getNumber() < typeNumber && typeNumber <= Type.INT8.getNumber();
+  }
+
+  private static boolean checkNumericType(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return Type.INT1.getNumber() <= typeNumber && typeNumber <= Type.NUMERIC.getNumber();
+  }
+
+  private static boolean checkTextData(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return Type.CHAR.getNumber() <= typeNumber && typeNumber <= Type.TEXT.getNumber();
+  }
+
+  private static boolean checkDateTime(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return (Type.DATE.getNumber() <= typeNumber && typeNumber <= Type.INTERVAL.getNumber()) ||
+        (Type.TIMEZ.getNumber() <= typeNumber && typeNumber <= Type.TIMESTAMPZ.getNumber());
+  }
+
+  @Override
+  public EvalNode visitPlus(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitPlus(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitMinus(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitMinus(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitMultiply(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitMultiply(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitDivide(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitDivide(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    checkDivisionByZero(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitModular(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitDivide(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    checkDivisionByZero(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitFuncCall(VerificationState context, GeneralFunctionEval evalNode, Stack<EvalNode> stack) {
+    super.visitFuncCall(context, evalNode, stack);
+    if (evalNode.getArgs() != null) {
+      for (EvalNode param : evalNode.getArgs()) {
+        visitChild(context, param, stack);
+      }
+    }
+    return evalNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
new file mode 100644
index 0000000..b6912a7
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/LogicalPlanVerifier.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.verifier;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+
+import java.util.Stack;
+
+public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<LogicalPlanVerifier.Context, LogicalNode> {
+  private TajoConf conf;
+  private CatalogService catalog;
+
+  public LogicalPlanVerifier(TajoConf conf, CatalogService catalog) {
+    this.conf = conf;
+    this.catalog = catalog;
+  }
+
+  public static class Context {
+    OverridableConf queryContext;
+    VerificationState state;
+
+    public Context(OverridableConf queryContext, VerificationState state) {
+      this.queryContext = this.queryContext;
+      this.state = state;
+    }
+  }
+
+  public VerificationState verify(OverridableConf queryContext, VerificationState state, LogicalPlan plan)
+      throws PlanningException {
+    Context context = new Context(queryContext, state);
+    visit(context, plan, plan.getRootBlock());
+    return context.state;
+  }
+
+  /**
+   * It checks if an output schema of a projectable node and target's output data types are equivalent to each other.
+   */
+  private static void verifyProjectableOutputSchema(Projectable node) throws PlanningException {
+
+    Schema outputSchema = node.getOutSchema();
+    Schema targetSchema = PlannerUtil.targetToSchema(node.getTargets());
+
+    if (outputSchema.size() != node.getTargets().length) {
+      throw new PlanningException(String.format("Output schema and Target's schema are mismatched at Node (%d)",
+          + node.getPID()));
+    }
+
+    for (int i = 0; i < outputSchema.size(); i++) {
+      if (!outputSchema.getColumn(i).getDataType().equals(targetSchema.getColumn(i).getDataType())) {
+        Column targetColumn = targetSchema.getColumn(i);
+        Column insertColumn = outputSchema.getColumn(i);
+        throw new PlanningException("ERROR: " +
+            insertColumn.getSimpleName() + " is of type " + insertColumn.getDataType().getType().name() +
+            ", but target column '" + targetColumn.getSimpleName() + "' is of type " +
+            targetColumn.getDataType().getType().name());
+      }
+    }
+  }
+
+  @Override
+  public LogicalNode visitProjection(Context state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitProjection(state, plan, block, node, stack);
+
+    for (Target target : node.getTargets()) {
+      ExprsVerifier.verify(state.state, node, target.getEvalTree());
+    }
+
+    verifyProjectableOutputSchema(node);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitLimit(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                LimitNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitLimit(context, plan, block, node, stack);
+
+    if (node.getFetchFirstNum() < 0) {
+      context.state.addVerification("LIMIT must not be negative");
+    }
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                  GroupbyNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitGroupBy(context, plan, block, node, stack);
+
+    verifyProjectableOutputSchema(node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    visit(context, plan, block, node.getChild(), stack);
+    ExprsVerifier.verify(context.state, node, node.getQual());
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    visit(context, plan, block, node.getLeftChild(), stack);
+    visit(context, plan, block, node.getRightChild(), stack);
+
+    if (node.hasJoinQual()) {
+      ExprsVerifier.verify(context.state, node, node.getJoinQual());
+    }
+
+    verifyProjectableOutputSchema(node);
+
+    return node;
+  }
+
+  private void verifySetStatement(VerificationState state, BinaryNode setNode) {
+    Preconditions.checkArgument(setNode.getType() == NodeType.UNION || setNode.getType() == NodeType.INTERSECT ||
+      setNode.getType() == NodeType.EXCEPT);
+    Schema left = setNode.getLeftChild().getOutSchema();
+    Schema right = setNode.getRightChild().getOutSchema();
+    NodeType type = setNode.getType();
+
+    if (left.size() != right.size()) {
+      state.addVerification("each " + type.name() + " query must have the same number of columns");
+      return;
+    }
+
+    Column[] leftColumns = left.toArray();
+    Column[] rightColumns = right.toArray();
+
+    for (int i = 0; i < leftColumns.length; i++) {
+      if (!leftColumns[i].getDataType().equals(rightColumns[i].getDataType())) {
+        state.addVerification(type + " types " + leftColumns[i].getDataType().getType() + " and "
+            + rightColumns[i].getDataType().getType() + " cannot be matched");
+      }
+    }
+  }
+
+  @Override
+  public LogicalNode visitUnion(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                UnionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitUnion(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitExcept(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 ExceptNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitExcept(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitIntersect(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    IntersectNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitIntersect(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitTableSubQuery(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                   TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitTableSubQuery(context, plan, block, node, stack);
+    if (node.hasTargets()) {
+      for (Target target : node.getTargets()) {
+        ExprsVerifier.verify(context.state, node, target.getEvalTree());
+      }
+    }
+
+    verifyProjectableOutputSchema(node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    if (node.hasTargets()) {
+      for (Target target : node.getTargets()) {
+        ExprsVerifier.verify(context.state, node, target.getEvalTree());
+      }
+    }
+
+    if (node.hasQual()) {
+      ExprsVerifier.verify(context.state, node, node.getQual());
+    }
+
+    verifyProjectableOutputSchema(node);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitStoreTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     StoreTableNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitStoreTable(context, plan, block, node, stack);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 InsertNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitInsert(context, plan, block, node, stack);
+    return node;
+  }
+
+  /**
+   * This ensures that corresponding columns in both tables are equivalent to each other.
+   */
+  private static void ensureDomains(VerificationState state, Schema targetTableScheme, Schema schema)
+      throws PlanningException {
+    for (int i = 0; i < schema.size(); i++) {
+      if (!schema.getColumn(i).getDataType().equals(targetTableScheme.getColumn(i).getDataType())) {
+        Column targetColumn = targetTableScheme.getColumn(i);
+        Column insertColumn = schema.getColumn(i);
+        state.addVerification("ERROR: " +
+            insertColumn.getSimpleName() + " is of type " + insertColumn.getDataType().getType().name() +
+            ", but target column '" + targetColumn.getSimpleName() + "' is of type " +
+            targetColumn.getDataType().getType().name());
+      }
+    }
+  }
+
+  @Override
+  public LogicalNode visitCreateTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                      CreateTableNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitCreateTable(context, plan, block, node, stack);
+    // here, we don't need check table existence because this check is performed in PreLogicalPlanVerifier.
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitDropTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    DropTableNode node, Stack<LogicalNode> stack) {
+    // here, we don't need check table existence because this check is performed in PreLogicalPlanVerifier.
+    return node;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
new file mode 100644
index 0000000..95e0f30
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.verifier;
+
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.plan.util.ExprFinder;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Set;
+import java.util.Stack;
+
+public class PreLogicalPlanVerifier extends BaseAlgebraVisitor<PreLogicalPlanVerifier.Context, Expr> {
+  private CatalogService catalog;
+
+  public PreLogicalPlanVerifier(CatalogService catalog) {
+    this.catalog = catalog;
+  }
+
+  public static class Context {
+    OverridableConf queryContext;
+    VerificationState state;
+
+    public Context(OverridableConf queryContext, VerificationState state) {
+      this.queryContext = queryContext;
+      this.state = state;
+    }
+  }
+
+  public VerificationState verify(OverridableConf queryContext, VerificationState state, Expr expr)
+      throws PlanningException {
+    Context context = new Context(queryContext, state);
+    visit(context, new Stack<Expr>(), expr);
+    return context.state;
+  }
+
+  public Expr visitProjection(Context context, Stack<Expr> stack, Projection expr) throws PlanningException {
+    super.visitProjection(context, stack, expr);
+
+    Set<String> names = TUtil.newHashSet();
+    Expr [] distinctValues = null;
+
+    for (NamedExpr namedExpr : expr.getNamedExprs()) {
+
+      if (namedExpr.hasAlias()) {
+        if (names.contains(namedExpr.getAlias())) {
+          context.state.addVerification(String.format("column name \"%s\" specified more than once",
+              namedExpr.getAlias()));
+        } else {
+          names.add(namedExpr.getAlias());
+        }
+      }
+
+      Set<GeneralSetFunctionExpr> exprs = ExprFinder.finds(namedExpr.getExpr(), OpType.GeneralSetFunction);
+
+      // Currently, avg functions with distinct aggregation are not supported.
+      // This code does not allow users to use avg functions with distinct aggregation.
+      if (distinctValues != null) {
+        for (GeneralSetFunctionExpr setFunction : exprs) {
+          if (setFunction.getSignature().equalsIgnoreCase("avg")) {
+            if (setFunction.isDistinct()) {
+              throw new PlanningException("avg(distinct) function is not supported yet.");
+            } else {
+              throw new PlanningException("avg() function with distinct aggregation functions is not supported yet.");
+            }
+          }
+        }
+      }
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitLimit(Context context, Stack<Expr> stack, Limit expr) throws PlanningException {
+    stack.push(expr);
+
+    if (ExprFinder.finds(expr.getFetchFirstNum(), OpType.Column).size() > 0) {
+      context.state.addVerification("argument of LIMIT must not contain variables");
+    }
+
+    visit(context, stack, expr.getFetchFirstNum());
+    Expr result = visit(context, stack, expr.getChild());
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public Expr visitGroupBy(Context context, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+    super.visitGroupBy(context, stack, expr);
+
+    // Enforcer only ordinary grouping set.
+    for (Aggregation.GroupElement groupingElement : expr.getGroupSet()) {
+      if (groupingElement.getType() != Aggregation.GroupType.OrdinaryGroup) {
+        context.state.addVerification(groupingElement.getType() + " is not supported yet");
+      }
+    }
+
+    Projection projection = null;
+    for (Expr parent : stack) {
+      if (parent.getType() == OpType.Projection) {
+        projection = (Projection) parent;
+        break;
+      }
+    }
+
+    if (projection == null) {
+      throw new PlanningException("No Projection");
+    }
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitRelation(Context context, Stack<Expr> stack, Relation expr) throws PlanningException {
+    assertRelationExistence(context, expr.getName());
+    return expr;
+  }
+
+  private boolean assertRelationExistence(Context context, String tableName) {
+    String qualifiedName;
+
+    if (CatalogUtil.isFQTableName(tableName)) {
+      qualifiedName = tableName;
+    } else {
+      qualifiedName = CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE), tableName);
+    }
+
+    if (!catalog.existsTable(qualifiedName)) {
+      context.state.addVerification(String.format("relation \"%s\" does not exist", qualifiedName));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertRelationNoExistence(Context context, String tableName) {
+    String qualifiedName;
+
+    if (CatalogUtil.isFQTableName(tableName)) {
+      qualifiedName = tableName;
+    } else {
+      qualifiedName = CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE), tableName);
+    }
+    if(qualifiedName == null) {
+      System.out.println("A");
+    }
+    if (catalog.existsTable(qualifiedName)) {
+      context.state.addVerification(String.format("relation \"%s\" already exists", qualifiedName));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertUnsupportedStoreType(VerificationState state, String name) {
+    if (name != null && name.equals(CatalogProtos.StoreType.RAW.name())) {
+      state.addVerification(String.format("Unsupported store type :%s", name));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertDatabaseExistence(VerificationState state, String name) {
+    if (!catalog.existDatabase(name)) {
+      state.addVerification(String.format("database \"%s\" does not exist", name));
+      return false;
+    }
+    return true;
+  }
+
+  private boolean assertDatabaseNoExistence(VerificationState state, String name) {
+    if (catalog.existDatabase(name)) {
+      state.addVerification(String.format("database \"%s\" already exists", name));
+      return false;
+    }
+    return true;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Definition Language Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+
+  @Override
+  public Expr visitCreateDatabase(Context context, Stack<Expr> stack, CreateDatabase expr)
+      throws PlanningException {
+    super.visitCreateDatabase(context, stack, expr);
+    if (!expr.isIfNotExists()) {
+      assertDatabaseNoExistence(context.state, expr.getDatabaseName());
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitDropDatabase(Context context, Stack<Expr> stack, DropDatabase expr) throws PlanningException {
+    super.visitDropDatabase(context, stack, expr);
+    if (!expr.isIfExists()) {
+      assertDatabaseExistence(context.state, expr.getDatabaseName());
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitCreateTable(Context context, Stack<Expr> stack, CreateTable expr) throws PlanningException {
+    super.visitCreateTable(context, stack, expr);
+    if (!expr.isIfNotExists()) {
+      assertRelationNoExistence(context, expr.getTableName());
+    }
+    assertUnsupportedStoreType(context.state, expr.getStorageType());
+    return expr;
+  }
+
+  @Override
+  public Expr visitDropTable(Context context, Stack<Expr> stack, DropTable expr) throws PlanningException {
+    super.visitDropTable(context, stack, expr);
+    if (!expr.isIfExists()) {
+      assertRelationExistence(context, expr.getTableName());
+    }
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Insert or Update Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public Expr visitInsert(Context context, Stack<Expr> stack, Insert expr) throws PlanningException {
+    Expr child = super.visitInsert(context, stack, expr);
+
+    if (expr.hasTableName()) {
+      assertRelationExistence(context, expr.getTableName());
+    }
+
+    if (child != null && child.getType() == OpType.Projection) {
+      Projection projection = (Projection) child;
+      int projectColumnNum = projection.getNamedExprs().length;
+
+      if (expr.hasTargetColumns()) {
+        int targetColumnNum = expr.getTargetColumns().length;
+
+        if (targetColumnNum > projectColumnNum)  {
+          context.state.addVerification("INSERT has more target columns than expressions");
+        } else if (targetColumnNum < projectColumnNum) {
+          context.state.addVerification("INSERT has more expressions than target columns");
+        }
+      } else {
+        if (expr.hasTableName()) {
+          String qualifiedName = expr.getTableName();
+          if (TajoConstants.EMPTY_STRING.equals(CatalogUtil.extractQualifier(expr.getTableName()))) {
+            qualifiedName = CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE),
+                expr.getTableName());
+          }
+
+          TableDesc table = catalog.getTableDesc(qualifiedName);
+          if (table == null) {
+            context.state.addVerification(String.format("relation \"%s\" does not exist", qualifiedName));
+            return null;
+          }
+          if (table.hasPartition()) {
+            int columnSize = table.getSchema().getColumns().size();
+            columnSize += table.getPartitionMethod().getExpressionSchema().getColumns().size();
+            if (projectColumnNum < columnSize) {
+              context.state.addVerification("INSERT has smaller expressions than target columns");
+            } else if (projectColumnNum > columnSize) {
+              context.state.addVerification("INSERT has more expressions than target columns");
+            }
+          }
+        }
+      }
+    }
+
+    return expr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerificationState.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerificationState.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerificationState.java
new file mode 100644
index 0000000..a27b200
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerificationState.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.verifier;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.TUtil;
+
+import java.util.List;
+
+public class VerificationState {
+  private static final Log LOG = LogFactory.getLog(VerificationState.class);
+  List<String> errorMessages = Lists.newArrayList();
+
+  public void addVerification(String error) {
+    LOG.warn(TUtil.getCurrentCodePoint(1) + " causes: " + error);
+    errorMessages.add(error);
+  }
+
+  public boolean verified() {
+    return errorMessages.size() == 0;
+  }
+
+  public List<String> getErrorMessages() {
+    return errorMessages;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerifyException.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerifyException.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerifyException.java
new file mode 100644
index 0000000..2e7a9c2
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/VerifyException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.verifier;
+
+import org.apache.tajo.plan.PlanningException;
+
+public class VerifyException extends PlanningException {
+  public VerifyException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b143f991/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
new file mode 100644
index 0000000..89eb4a8
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.visitor;
+
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.*;
+
+import java.util.Stack;
+
+public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisitor<CONTEXT, RESULT> {
+
+  /**
+   * The prehook is called before each node is visited.
+   */
+  @SuppressWarnings("unused")
+  public void preHook(LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack, CONTEXT data)
+      throws PlanningException {
+  }
+
+  /**
+   * The posthook is called after each node is visited.
+   */
+  @SuppressWarnings("unused")
+  public void postHook(LogicalPlan plan, LogicalNode node, Stack<LogicalNode> stack, CONTEXT data)
+      throws PlanningException {
+  }
+
+  public CONTEXT visit(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block)
+      throws PlanningException {
+    visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>());
+    return context;
+  }
+
+  /**
+   * visit visits each logicalNode recursively.
+   */
+  public RESULT visit(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalNode node,
+                      Stack<LogicalNode> stack)
+      throws PlanningException {
+    RESULT current;
+    switch (node.getType()) {
+      case ROOT:
+        current = visitRoot(context, plan, block, (LogicalRootNode) node, stack);
+        break;
+      case EXPRS:
+        return null;
+      case PROJECTION:
+        current = visitProjection(context, plan, block, (ProjectionNode) node, stack);
+        break;
+      case LIMIT:
+        current = visitLimit(context, plan, block, (LimitNode) node, stack);
+        break;
+      case SORT:
+        current = visitSort(context, plan, block, (SortNode) node, stack);
+        break;
+      case HAVING:
+        current = visitHaving(context, plan, block, (HavingNode) node, stack);
+        break;
+      case GROUP_BY:
+        current = visitGroupBy(context, plan, block, (GroupbyNode) node, stack);
+        break;
+      case WINDOW_AGG:
+        current = visitWindowAgg(context, plan, block, (WindowAggNode) node, stack);
+        break;
+      case DISTINCT_GROUP_BY:
+        current = visitDistinct(context, plan, block, (DistinctGroupbyNode) node, stack);
+        break;
+      case SELECTION:
+        current = visitFilter(context, plan, block, (SelectionNode) node, stack);
+        break;
+      case JOIN:
+        current = visitJoin(context, plan, block, (JoinNode) node, stack);
+        break;
+      case UNION:
+        current = visitUnion(context, plan, block, (UnionNode) node, stack);
+        break;
+      case EXCEPT:
+        current = visitExcept(context, plan, block, (ExceptNode) node, stack);
+        break;
+      case INTERSECT:
+        current = visitIntersect(context, plan, block, (IntersectNode) node, stack);
+        break;
+      case TABLE_SUBQUERY:
+        current = visitTableSubQuery(context, plan, block, (TableSubQueryNode) node, stack);
+        break;
+      case SCAN:
+        current = visitScan(context, plan, block, (ScanNode) node, stack);
+        break;
+      case PARTITIONS_SCAN:
+        current = visitPartitionedTableScan(context, plan, block, (PartitionedTableScanNode) node, stack);
+        break;
+      case STORE:
+        current = visitStoreTable(context, plan, block, (StoreTableNode) node, stack);
+        break;
+      case INSERT:
+        current = visitInsert(context, plan, block, (InsertNode) node, stack);
+        break;
+      case CREATE_DATABASE:
+        current = visitCreateDatabase(context, plan, block, (CreateDatabaseNode) node, stack);
+        break;
+      case DROP_DATABASE:
+        current = visitDropDatabase(context, plan, block, (DropDatabaseNode) node, stack);
+        break;
+      case CREATE_TABLE:
+        current = visitCreateTable(context, plan, block, (CreateTableNode) node, stack);
+        break;
+      case DROP_TABLE:
+        current = visitDropTable(context, plan, block, (DropTableNode) node, stack);
+        break;
+      case ALTER_TABLESPACE:
+        current = visitAlterTablespace(context, plan, block, (AlterTablespaceNode) node, stack);
+        break;
+      case ALTER_TABLE:
+        current = visitAlterTable(context, plan, block, (AlterTableNode) node, stack);
+        break;
+      case TRUNCATE_TABLE:
+        current = visitTruncateTable(context, plan, block, (TruncateTableNode) node, stack);
+        break;
+      default:
+        throw new PlanningException("Unknown logical node type: " + node.getType());
+    }
+
+    return current;
+  }
+
+  @Override
+  public RESULT visitRoot(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalRootNode node,
+                          Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitProjection(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ProjectionNode node,
+                                Stack<LogicalNode> stack)
+      throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitLimit(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, LimitNode node,
+                           Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitSort(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SortNode node,
+                          Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitHaving(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, HavingNode node,
+                            Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitGroupBy(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
+                             Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitWindowAgg(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, WindowAggNode node,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  public RESULT visitDistinct(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DistinctGroupbyNode node,
+                             Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitFilter(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, SelectionNode node,
+                            Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitJoin(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                          Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getLeftChild(), stack);
+    visit(context, plan, block, node.getRightChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitUnion(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node,
+                           Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild());
+    RESULT result = visit(context, plan, leftBlock, leftBlock.getRoot(), stack);
+    LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild());
+    visit(context, plan, rightBlock, rightBlock.getRoot(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitExcept(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ExceptNode node,
+                            Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getLeftChild(), stack);
+    visit(context, plan, block, node.getRightChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitIntersect(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, IntersectNode node,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getLeftChild(), stack);
+    visit(context, plan, block, node.getRightChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitTableSubQuery(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                   TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    LogicalPlan.QueryBlock childBlock = plan.getBlock(node.getSubQuery());
+    RESULT result = visit(context, plan, childBlock, childBlock.getRoot(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                          Stack<LogicalNode> stack) throws PlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                          PartitionedTableScanNode node, Stack<LogicalNode> stack)
+      throws PlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, StoreTableNode node,
+                                Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitInsert(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
+                            Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    RESULT result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitCreateDatabase(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    CreateDatabaseNode node, Stack<LogicalNode> stack) throws PlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitDropDatabase(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropDatabaseNode node, Stack<LogicalNode> stack) throws PlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitCreateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateTableNode node,
+                                 Stack<LogicalNode> stack) throws PlanningException {
+    RESULT result = null;
+    stack.push(node);
+    if (node.hasSubQuery()) {
+      result = visit(context, plan, block, node.getChild(), stack);
+    }
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitDropTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropTableNode node,
+                               Stack<LogicalNode> stack) {
+    return null;
+  }
+
+  @Override
+  public RESULT visitAlterTablespace(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     AlterTablespaceNode node, Stack<LogicalNode> stack) throws PlanningException {
+    return null;
+  }
+
+  @Override
+  public RESULT visitAlterTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, AlterTableNode node,
+                                 Stack<LogicalNode> stack) {
+        return null;
+    }
+
+  @Override
+  public RESULT visitTruncateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                   TruncateTableNode node, Stack<LogicalNode> stack) throws PlanningException {
+    return null;
+  }
+}


Mime
View raw message