tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [39/57] [abbrv] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)
Date Fri, 18 Apr 2014 11:44:42 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
new file mode 100644
index 0000000..9030629
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprNormalizer.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogUtil;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Stack;
+
+/**
+ * ExprNormalizer performs two kinds of works:
+ *
+ * <h3>1. Duplication Removal.</h3>
+ *
+ * For example, assume a simple query as follows:
+ * <pre>
+ *   select price * rate as total_price, ..., order by price * rate
+ * </pre>
+ *
+ * The expression <code>price * rate</code> is duplicated in both select list and order by clause.
+ * Against those cases, ExprNormalizer removes duplicated expressions and replaces one with one reference.
+ * In the case, ExprNormalizer replaces price * rate with total_price reference.
+ *
+ * <h3>2. Dissection of Expression</h3>
+ *
+ * A expression can be a complex expressions, including a mixed of scalar and aggregation expressions.
+ * For example, assume an aggregation query as follows:
+ * <pre>
+ *   select sum(price * rate) * (1 - avg(discount_rate))), ...
+ * </pre>
+ *
+ * In this case, ExprNormalizer dissects the expression 'sum(price * rate) * (1 - avg(discount_rate)))'
+ * into the following expressions:
+ * <ul>
+ *   <li>$1 = price * rage</li>
+ *   <li>$2 = sum($1)</li>
+ *   <li>$3 = avg(discount_rate)</li>
+ *   <li>$4 = $2 * (1 - $3)</li>
+ * </ul>
+ *
+ * It mainly two advantages. Firstly, it makes complex expression evaluations easier across multiple physical executors.
+ * Second, it gives move opportunities to remove duplicated expressions.
+ *
+ * <h3>3. Name Normalization</h3>
+ *
+ * Users can use qualified column names, unqualified column names or aliased column references.
+ *
+ * Consider the following example:
+ *
+ * <pre>
+ *   select rate_a as total_rate, rate_a * 100, table1.rate_a, ... WHERE total_rate * 100
+ * </pre>
+ *
+ * <code>total_rate</code>, <code>rate_a</code>, and <code>table1.rate_a</code> are all the same references. But,
+ * they have different forms. Due to their different forms, duplication removal can be hard.
+ *
+ * In order to solve this problem, ExprNormalizer normalizes all column references as qualified names while it keeps
+ * its points..
+ */
+class ExprNormalizer extends SimpleAlgebraVisitor<ExprNormalizer.ExprNormalizedResult, Object> {
+
+  public static class ExprNormalizedResult {
+    private final LogicalPlan plan;
+    private final LogicalPlan.QueryBlock block;
+
+    Expr baseExpr; // outmost expressions, which can includes one or more references of the results of aggregation
+                   // function.
+    List<NamedExpr> aggExprs = new ArrayList<NamedExpr>(); // aggregation functions
+    List<NamedExpr> scalarExprs = new ArrayList<NamedExpr>(); // scalar expressions which can be referred
+
+    private ExprNormalizedResult(LogicalPlanner.PlanContext context) {
+      this.plan = context.plan;
+      this.block = context.queryBlock;
+    }
+
+    @Override
+    public String toString() {
+      return baseExpr.toString() + ", agg=" + aggExprs.size() + ", scalar=" + scalarExprs.size();
+    }
+  }
+
+  public ExprNormalizedResult normalize(LogicalPlanner.PlanContext context, Expr expr) throws PlanningException {
+    ExprNormalizedResult exprNormalizedResult = new ExprNormalizedResult(context);
+    Stack<Expr> stack = new Stack<Expr>();
+    stack.push(expr);
+    visit(exprNormalizedResult, new Stack<Expr>(), expr);
+    exprNormalizedResult.baseExpr = stack.pop();
+    return exprNormalizedResult;
+  }
+
+  @Override
+  public Object visitCaseWhen(ExprNormalizedResult ctx, Stack<Expr> stack, CaseWhenPredicate expr)
+      throws PlanningException {
+    stack.push(expr);
+    for (CaseWhenPredicate.WhenExpr when : expr.getWhens()) {
+      visit(ctx, stack, when.getCondition());
+      visit(ctx, stack, when.getResult());
+
+      if (OpType.isAggregationFunction(when.getCondition().getType())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getCondition());
+        ctx.aggExprs.add(new NamedExpr(when.getCondition(), referenceName));
+        when.setCondition(new ColumnReferenceExpr(referenceName));
+      }
+
+      if (OpType.isAggregationFunction(when.getResult().getType())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(when.getResult());
+        ctx.aggExprs.add(new NamedExpr(when.getResult(), referenceName));
+        when.setResult(new ColumnReferenceExpr(referenceName));
+      }
+    }
+
+    if (expr.hasElseResult()) {
+      visit(ctx, stack, expr.getElseResult());
+      if (OpType.isAggregationFunction(expr.getElseResult().getType())) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getElseResult());
+        ctx.aggExprs.add(new NamedExpr(expr.getElseResult(), referenceName));
+        expr.setElseResult(new ColumnReferenceExpr(referenceName));
+      }
+    }
+    stack.pop();
+    return expr;
+  }
+
+  @Override
+  public Expr visitUnaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, UnaryOperator expr) throws PlanningException {
+    super.visitUnaryOperator(ctx, stack, expr);
+    if (OpType.isAggregationFunction(expr.getChild().getType())) {
+      // Get an anonymous column name and replace the aggregation function by the column name
+      String refName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
+      ctx.aggExprs.add(new NamedExpr(expr.getChild(), refName));
+      expr.setChild(new ColumnReferenceExpr(refName));
+    }
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitBinaryOperator(ExprNormalizedResult ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
+    super.visitBinaryOperator(ctx, stack, expr);
+
+    ////////////////////////
+    // For Left Term
+    ////////////////////////
+
+    if (OpType.isAggregationFunction(expr.getLeft().getType())) {
+      String leftRefName = ctx.block.namedExprsMgr.addExpr(expr.getLeft());
+      ctx.aggExprs.add(new NamedExpr(expr.getLeft(), leftRefName));
+      expr.setLeft(new ColumnReferenceExpr(leftRefName));
+    }
+
+
+    ////////////////////////
+    // For Right Term
+    ////////////////////////
+    if (OpType.isAggregationFunction(expr.getRight().getType())) {
+      String rightRefName = ctx.block.namedExprsMgr.addExpr(expr.getRight());
+      ctx.aggExprs.add(new NamedExpr(expr.getRight(), rightRefName));
+      expr.setRight(new ColumnReferenceExpr(rightRefName));
+    }
+
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Function Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitFunction(ExprNormalizedResult ctx, Stack<Expr> stack, FunctionExpr expr) throws PlanningException {
+    stack.push(expr);
+
+    Expr param;
+    for (int i = 0; i < expr.getParams().length; i++) {
+      param = expr.getParams()[i];
+      visit(ctx, stack, param);
+
+      if (OpType.isAggregationFunction(param.getType())) {
+        String referenceName = ctx.plan.generateUniqueColumnName(param);
+        ctx.aggExprs.add(new NamedExpr(param, referenceName));
+        expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
+      }
+    }
+
+    stack.pop();
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitGeneralSetFunction(ExprNormalizedResult ctx, Stack<Expr> stack, GeneralSetFunctionExpr expr)
+      throws PlanningException {
+    stack.push(expr);
+
+    Expr param;
+    for (int i = 0; i < expr.getParams().length; i++) {
+      param = expr.getParams()[i];
+      visit(ctx, stack, param);
+
+
+      // If parameters are all constants, we don't need to dissect an aggregation expression into two parts:
+      // function and parameter parts.
+      if (!OpType.isLiteral(param.getType()) && param.getType() != OpType.Column) {
+        String referenceName = ctx.block.namedExprsMgr.addExpr(param);
+        ctx.scalarExprs.add(new NamedExpr(param, referenceName));
+        expr.getParams()[i] = new ColumnReferenceExpr(referenceName);
+      }
+    }
+    stack.pop();
+    return expr;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Literal Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public Expr visitCastExpr(ExprNormalizedResult ctx, Stack<Expr> stack, CastExpr expr) throws PlanningException {
+    super.visitCastExpr(ctx, stack, expr);
+    if (OpType.isAggregationFunction(expr.getType())) {
+      String referenceName = ctx.block.namedExprsMgr.addExpr(expr.getChild());
+      ctx.aggExprs.add(new NamedExpr(expr.getChild(), referenceName));
+      expr.setChild(new ColumnReferenceExpr(referenceName));
+    }
+    return expr;
+  }
+
+  @Override
+  public Expr visitColumnReference(ExprNormalizedResult ctx, Stack<Expr> stack, ColumnReferenceExpr expr)
+      throws PlanningException {
+    // if a column reference is not qualified, it finds and sets the qualified column name.
+    if (!(expr.hasQualifier() && CatalogUtil.isFQTableName(expr.getQualifier()))) {
+      if (!ctx.block.namedExprsMgr.contains(expr.getCanonicalName())) {
+        String normalized = ctx.plan.getNormalizedColumnName(ctx.block, expr);
+        expr.setName(normalized);
+      }
+    }
+    return expr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
new file mode 100644
index 0000000..551393c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+
+import java.util.Set;
+import java.util.Stack;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+import static org.apache.tajo.common.TajoDataTypes.Type;
+
+/**
+ * It verifies one predicate or expression with the semantic and data type checks as follows:
+ * <ul>
+ *   <ul>Both expressions in a binary expression are compatible to each other</ul>
+ *   <ul>All column references of one expression are avilable at this node</ul>
+ * </ul>
+ */
+public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalNode> {
+  private static final ExprsVerifier instance;
+
+  static {
+    instance = new ExprsVerifier();
+  }
+
+  public static VerificationState verify(VerificationState state, LogicalNode currentNode, EvalNode expression)
+      throws PlanningException {
+    instance.visitChild(state, expression, new Stack<EvalNode>());
+    Set<Column> referredColumns = EvalTreeUtil.findUniqueColumns(expression);
+    for (Column referredColumn : referredColumns) {
+      if (!currentNode.getInSchema().contains(referredColumn)) {
+        throw new PlanningException("Invalid State: " + referredColumn + " cannot be accessible at Node ("
+            + currentNode.getPID() + ")");
+      }
+    }
+    return state;
+  }
+
+  /**
+   * It checks the compatibility of two data types.
+   */
+  private static boolean isCompatibleType(DataType dataType1, DataType dataType2) {
+    if (checkNumericType(dataType1) && checkNumericType(dataType2)) {
+      return true;
+    }
+
+    if (checkTextData(dataType1) && checkTextData(dataType2)) {
+      return true;
+    }
+
+    if (checkDateTime(dataType1) && checkDateTime(dataType2)) {
+      return true;
+    }
+
+    if (checkNetworkType(dataType1) && checkNetworkType(dataType2)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * It checks both expressions in a comparison operator are compatible to each other.
+   */
+  private static void verifyComparisonOperator(VerificationState state, BinaryEval expr) {
+    DataType leftType = expr.getLeftExpr().getValueType();
+    DataType rightType = expr.getRightExpr().getValueType();
+    if (!isCompatibleType(leftType, rightType)) {
+      state.addVerification("No operator matches the given name and argument type(s): " + expr.toString());
+    }
+  }
+
+  public EvalNode visitEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  public EvalNode visitNotEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitNotEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitLessThan(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitLessThan(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitLessThanOrEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitLessThanOrEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitGreaterThan(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitGreaterThan(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  @Override
+  public EvalNode visitGreaterThanOrEqual(VerificationState context, BinaryEval expr, Stack<EvalNode> stack) {
+    super.visitGreaterThanOrEqual(context, expr, stack);
+    verifyComparisonOperator(context, expr);
+    return expr;
+  }
+
+  private static void checkDivisionByZero(VerificationState state, BinaryEval evalNode) {
+    if (evalNode.getRightExpr().getType() == EvalType.CONST) {
+      ConstEval constEval = evalNode.getRightExpr();
+      if (constEval.getValue().asFloat8() == 0) {
+        state.addVerification("division by zero");
+      }
+    }
+  }
+
+  private static void checkArithmeticOperand(VerificationState state, BinaryEval evalNode) {
+    EvalNode leftExpr = evalNode.getLeftExpr();
+    EvalNode rightExpr = evalNode.getRightExpr();
+    if (!(checkNumericType(leftExpr.getValueType()) && checkNumericType(rightExpr.getValueType()))) {
+      state.addVerification("No operator matches the given name and argument type(s): " + evalNode.toString());
+    }
+  }
+
+  private static boolean checkNetworkType(DataType dataType) {
+    return dataType.getType() == Type.INET4 || dataType.getType() == Type.INET6;
+  }
+
+  private static boolean checkNumericType(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return Type.INT1.getNumber() < typeNumber && typeNumber <= Type.NUMERIC.getNumber();
+  }
+
+  private static boolean checkTextData(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return Type.CHAR.getNumber() < typeNumber && typeNumber <= Type.TEXT.getNumber();
+  }
+
+  private static boolean checkDateTime(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return (Type.DATE.getNumber() < typeNumber && typeNumber <= Type.INTERVAL.getNumber()) ||
+        (Type.TIMEZ.getNumber() < typeNumber && typeNumber <= Type.TIMESTAMPZ.getNumber());
+  }
+
+  @Override
+  public EvalNode visitPlus(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitPlus(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitMinus(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitMinus(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitMultiply(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitMultiply(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitDivide(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitDivide(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    checkDivisionByZero(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitModular(VerificationState context, BinaryEval evalNode, Stack<EvalNode> stack) {
+    super.visitDivide(context, evalNode, stack);
+    checkArithmeticOperand(context, evalNode);
+    checkDivisionByZero(context, evalNode);
+    return evalNode;
+  }
+
+  @Override
+  public EvalNode visitFuncCall(VerificationState context, GeneralFunctionEval evalNode, Stack<EvalNode> stack) {
+    super.visitFuncCall(context, evalNode, stack);
+    if (evalNode.getArgs() != null) {
+      for (EvalNode param : evalNode.getArgs()) {
+        visitChild(context, param, stack);
+      }
+    }
+    return evalNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
new file mode 100644
index 0000000..3fb05c2
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/GroupElement.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.algebra.Aggregation;
+import org.apache.tajo.engine.eval.EvalNode;
+
+public class GroupElement implements Cloneable {
+  @Expose private Aggregation.GroupType type;
+  @Expose private EvalNode [] groupingSets;
+
+  @SuppressWarnings("unused")
+  public GroupElement() {
+    // for gson
+  }
+
+  public GroupElement(Aggregation.GroupType type, EvalNode [] groupingSets) {
+    this.type = type;
+    this.groupingSets = groupingSets;
+  }
+
+  public Aggregation.GroupType getType() {
+    return this.type;
+  }
+
+  public EvalNode [] getGroupingSets() {
+    return this.groupingSets;
+  }
+
+  public String toString() {
+    Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
+        .setPrettyPrinting().create();
+    return gson.toJson(this);
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    GroupElement groups = (GroupElement) super.clone();
+    groups.type = type;
+    groups.groupingSets = new EvalNode [groupingSets.length];
+    for (int i = 0; i < groupingSets.length; i++) {
+      groups.groupingSets[i++] = (EvalNode) groupingSets[i].clone();
+    }
+    return groups;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
new file mode 100644
index 0000000..974dc60
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -0,0 +1,280 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.join.FoundJoinOrder;
+import org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrderAlgorithm;
+import org.apache.tajo.engine.planner.logical.join.JoinGraph;
+import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm;
+import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
+import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
+import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.Stack;
+
+import static org.apache.tajo.engine.planner.LogicalPlan.BlockEdge;
+import static org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrderAlgorithm.getCost;
+
+/**
+ * This class optimizes a logical plan.
+ */
+@InterfaceStability.Evolving
+public class LogicalOptimizer {
+  private BasicQueryRewriteEngine rulesBeforeJoinOpt;
+  private BasicQueryRewriteEngine rulesAfterToJoinOpt;
+  private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
+
+  public LogicalOptimizer(TajoConf systemConf) {
+    rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
+    rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
+
+    rulesAfterToJoinOpt = new BasicQueryRewriteEngine();
+    rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule());
+    rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf));
+  }
+
+  public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
+    rulesBeforeJoinOpt.rewrite(plan);
+
+    DirectedGraphCursor<String, BlockEdge> blockCursor =
+        new DirectedGraphCursor<String, BlockEdge>(plan.getQueryBlockGraph(), plan.getRootBlock().getName());
+
+    while(blockCursor.hasNext()) {
+      optimizeJoinOrder(plan, blockCursor.nextBlock());
+    }
+
+    rulesAfterToJoinOpt.rewrite(plan);
+    return plan.getRootBlock().getRoot();
+  }
+
+  private void optimizeJoinOrder(LogicalPlan plan, String blockName) throws PlanningException {
+    LogicalPlan.QueryBlock block = plan.getBlock(blockName);
+
+    if (block.hasNode(NodeType.JOIN)) {
+      String originalOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
+      double nonOptimizedJoinCost = JoinCostComputer.computeCost(plan, block);
+
+      // finding relations and filter expressions
+      JoinGraphContext joinGraphContext = JoinGraphBuilder.buildJoinGraph(plan, block);
+
+      // finding join order and restore remain filter order
+      FoundJoinOrder order = joinOrderAlgorithm.findBestOrder(plan, block,
+          joinGraphContext.joinGraph, joinGraphContext.relationsForProduct);
+      JoinNode newJoinNode = order.getOrderedJoin();
+      JoinNode old = PlannerUtil.findTopNode(block.getRoot(), NodeType.JOIN);
+
+      JoinTargetCollector collector = new JoinTargetCollector();
+      Set<Target> targets = new LinkedHashSet<Target>();
+      collector.visitJoin(targets, plan, block, old, new Stack<LogicalNode>());
+
+      if (targets.size() == 0) {
+        newJoinNode.setTargets(PlannerUtil.schemaToTargets(old.getOutSchema()));
+      } else {
+        newJoinNode.setTargets(targets.toArray(new Target[targets.size()]));
+      }
+
+      PlannerUtil.replaceNode(plan, block.getRoot(), old, newJoinNode);
+      String optimizedOrder = JoinOrderStringBuilder.buildJoinOrderString(plan, block);
+      block.addPlanHistory("Non-optimized join order: " + originalOrder + " (cost: " + nonOptimizedJoinCost + ")");
+      block.addPlanHistory("Optimized join order    : " + optimizedOrder + " (cost: " + order.getCost() + ")");
+    }
+  }
+
+  private static class JoinTargetCollector extends BasicLogicalPlanVisitor<Set<Target>, LogicalNode> {
+    @Override
+    public LogicalNode visitJoin(Set<Target> ctx, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                                 Stack<LogicalNode> stack)
+        throws PlanningException {
+      super.visitJoin(ctx, plan, block, node, stack);
+
+      if (node.hasTargets()) {
+        for (Target target : node.getTargets()) {
+          ctx.add(target);
+        }
+      }
+      return node;
+    }
+  }
+
+  private static class JoinGraphContext {
+    JoinGraph joinGraph = new JoinGraph();
+    Set<EvalNode> quals = Sets.newHashSet();
+    Set<String> relationsForProduct = Sets.newHashSet();
+  }
+
+  private static class JoinGraphBuilder extends BasicLogicalPlanVisitor<JoinGraphContext, LogicalNode> {
+    private final static JoinGraphBuilder instance;
+
+    static {
+      instance = new JoinGraphBuilder();
+    }
+
+    /**
+     * This is based on the assumtion that all join and filter conditions are placed on the right join and
+     * scan operators. In other words, filter push down must be performed before this method.
+     * Otherwise, this method may build incorrectly a join graph.
+     */
+    public static JoinGraphContext buildJoinGraph(LogicalPlan plan, LogicalPlan.QueryBlock block)
+        throws PlanningException {
+      JoinGraphContext joinGraphContext = new JoinGraphContext();
+      instance.visit(joinGraphContext, plan, block);
+      return joinGraphContext;
+    }
+
+    public LogicalNode visitFilter(JoinGraphContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                   SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
+      super.visitFilter(context, plan, block, node, stack);
+      context.quals.addAll(Lists.newArrayList(AlgebraicUtil.toConjunctiveNormalFormArray(node.getQual())));
+      return node;
+    }
+
+    @Override
+    public LogicalNode visitJoin(JoinGraphContext joinGraphContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 JoinNode joinNode, Stack<LogicalNode> stack)
+        throws PlanningException {
+      super.visitJoin(joinGraphContext, plan, block, joinNode, stack);
+      if (joinNode.hasJoinQual()) {
+        joinGraphContext.joinGraph.addJoin(plan, block, joinNode);
+      } else {
+        LogicalNode leftChild = joinNode.getLeftChild();
+        LogicalNode rightChild = joinNode.getRightChild();
+        if (leftChild instanceof RelationNode) {
+          RelationNode rel = (RelationNode) leftChild;
+          joinGraphContext.relationsForProduct.add(rel.getCanonicalName());
+        }
+        if (rightChild instanceof RelationNode) {
+          RelationNode rel = (RelationNode) rightChild;
+          joinGraphContext.relationsForProduct.add(rel.getCanonicalName());
+        }
+      }
+      return joinNode;
+    }
+  }
+
+  public static class JoinOrderStringBuilder extends BasicLogicalPlanVisitor<StringBuilder, LogicalNode> {
+    private static final JoinOrderStringBuilder instance;
+    static {
+      instance = new JoinOrderStringBuilder();
+    }
+
+    public static JoinOrderStringBuilder getInstance() {
+      return instance;
+    }
+
+    public static String buildJoinOrderString(LogicalPlan plan, LogicalPlan.QueryBlock block) throws PlanningException {
+      StringBuilder originalOrder = new StringBuilder();
+      instance.visit(originalOrder, plan, block);
+      return originalOrder.toString();
+    }
+
+    @Override
+    public LogicalNode visitJoin(StringBuilder sb, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
+                                 Stack<LogicalNode> stack)
+        throws PlanningException {
+      stack.push(joinNode);
+      sb.append("(");
+      visit(sb, plan, block, joinNode.getLeftChild(), stack);
+      sb.append(" ").append(getJoinNotation(joinNode.getJoinType())).append(" ");
+      visit(sb, plan, block, joinNode.getRightChild(), stack);
+      sb.append(")");
+      stack.pop();
+      return joinNode;
+    }
+
+    private static String getJoinNotation(JoinType joinType) {
+      switch (joinType) {
+      case CROSS: return "⋈";
+      case INNER: return "⋈θ";
+      case LEFT_OUTER: return "⟕";
+      case RIGHT_OUTER: return "⟖";
+      case FULL_OUTER: return "⟗";
+      case LEFT_SEMI: return "⋉";
+      case RIGHT_SEMI: return "⋊";
+      case LEFT_ANTI: return "▷";
+      }
+      return ",";
+    }
+
+    @Override
+    public LogicalNode visitTableSubQuery(StringBuilder sb, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                          TableSubQueryNode node, Stack<LogicalNode> stack) {
+      sb.append(node.getTableName());
+      return node;
+    }
+
+    public LogicalNode visitScan(StringBuilder sb, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                                 Stack<LogicalNode> stack) {
+      sb.append(node.getTableName());
+      return node;
+    }
+  }
+
+  private static class CostContext {
+    double accumulatedCost = 0;
+  }
+
+  public static class JoinCostComputer extends BasicLogicalPlanVisitor<CostContext, LogicalNode> {
+    private static final JoinCostComputer instance;
+
+    static {
+      instance = new JoinCostComputer();
+    }
+
+    public static double computeCost(LogicalPlan plan, LogicalPlan.QueryBlock block) throws PlanningException {
+      CostContext costContext = new CostContext();
+      instance.visit(costContext, plan, block);
+      return costContext.accumulatedCost;
+    }
+
+    @Override
+    public LogicalNode visitJoin(CostContext joinGraphContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 JoinNode joinNode, Stack<LogicalNode> stack)
+        throws PlanningException {
+      super.visitJoin(joinGraphContext, plan, block, joinNode, stack);
+
+      double filterFactor = 1;
+      if (joinNode.hasJoinQual()) {
+        EvalNode [] quals = AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual());
+        filterFactor = Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, quals.length);
+      }
+
+      if (joinNode.getLeftChild() instanceof RelationNode) {
+        joinGraphContext.accumulatedCost = getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild())
+            * filterFactor;
+      } else {
+        joinGraphContext.accumulatedCost = joinGraphContext.accumulatedCost +
+            (joinGraphContext.accumulatedCost * getCost(joinNode.getRightChild()) * filterFactor);
+      }
+
+      return joinNode;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
new file mode 100644
index 0000000..98fbf42
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.ObjectUtils;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.annotation.NotThreadSafe;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.exception.NoSuchColumnException;
+import org.apache.tajo.engine.exception.VerifyException;
+import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
+import org.apache.tajo.engine.planner.graph.SimpleDirectedGraph;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.engine.planner.logical.NodeType;
+import org.apache.tajo.engine.planner.logical.RelationNode;
+import org.apache.tajo.util.TUtil;
+
+import java.lang.reflect.Constructor;
+import java.util.*;
+
+/**
+ * This represents and keeps every information about a query plan for a query.
+ */
+@NotThreadSafe
+public class LogicalPlan {
+  /** the prefix character for virtual tables */
+  public static final char VIRTUAL_TABLE_PREFIX='#';
+  public static final char NONAMED_COLUMN_PREFIX='?';
+  /** it indicates the root block */
+  public static final String ROOT_BLOCK = VIRTUAL_TABLE_PREFIX + "ROOT";
+  public static final String NONAME_BLOCK_PREFIX = VIRTUAL_TABLE_PREFIX + "QB_";
+  private static final int NO_SEQUENCE_PID = -1;
+  private int nextPid = 0;
+  private Integer noNameBlockId = 0;
+  private Integer noNameColumnId = 0;
+
+  /** a map from between a block name to a block plan */
+  private Map<String, QueryBlock> queryBlocks = new LinkedHashMap<String, QueryBlock>();
+  private Map<Integer, LogicalNode> nodeMap = new HashMap<Integer, LogicalNode>();
+  private Map<Integer, QueryBlock> queryBlockByPID = new HashMap<Integer, QueryBlock>();
+  private Map<String, String> exprToBlockNameMap = TUtil.newHashMap();
+  private SimpleDirectedGraph<String, BlockEdge> queryBlockGraph = new SimpleDirectedGraph<String, BlockEdge>();
+
+  /** planning and optimization log */
+  private List<String> planingHistory = Lists.newArrayList();
+  LogicalPlanner planner;
+
+  private boolean isExplain;
+  private final String currentDatabase;
+
+  public LogicalPlan(String currentDatabase, LogicalPlanner planner) {
+    this.currentDatabase = currentDatabase;
+    this.planner = planner;
+  }
+
+  /**
+   * Create a LogicalNode instance for a type. Each a LogicalNode instance is given an unique plan node id (PID).
+   *
+   * @param theClass The class to be created
+   * @return a LogicalNode instance identified by an unique plan node id (PID).
+   */
+  public <T extends LogicalNode> T createNode(Class<T> theClass) {
+    try {
+      Constructor<T> ctor = theClass.getConstructor(int.class);
+      return ctor.newInstance(newPID());
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Create a LogicalNode instance for a type. Each a LogicalNode instance is not given an unique plan node id (PID).
+   * This method must be only used after all query planning and optimization phases.
+   *
+   * @param theClass The class to be created
+   * @return a LogicalNode instance
+   */
+  public static <T extends LogicalNode> T createNodeWithoutPID(Class<T> theClass) {
+    try {
+      Constructor<T> ctor = theClass.getConstructor(int.class);
+      return ctor.newInstance(NO_SEQUENCE_PID);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void setExplain() {
+    isExplain = true;
+  }
+
+  public boolean isExplain() {
+    return isExplain;
+  }
+
+  /**
+   * Create a new {@link QueryBlock} and Get
+   *
+   * @param blockName the query block name
+   * @return a created query block
+   */
+  public QueryBlock newAndGetBlock(String blockName) {
+    QueryBlock block = new QueryBlock(blockName);
+    queryBlocks.put(blockName, block);
+    return block;
+  }
+
+  public int newPID() {
+    return nextPid++;
+  }
+
+  public QueryBlock newQueryBlock() {
+    return newAndGetBlock(NONAME_BLOCK_PREFIX + (noNameBlockId++));
+  }
+
+  /**
+   * It generates an unique column name from EvalNode. It is usually used for an expression or predicate without
+   * a specified name (i.e., alias).
+   */
+  public String generateUniqueColumnName(EvalNode evalNode) {
+    String prefix = evalNode.getName();
+    return attachSeqIdToGeneratedColumnName(prefix).toLowerCase();
+  }
+
+  /**
+   * It generates an unique column name from Expr. It is usually used for an expression or predicate without
+   * a specified name (i.e., alias).
+   */
+  public String generateUniqueColumnName(Expr expr) {
+    String generatedName;
+    if (expr.getType() == OpType.Column) {
+      generatedName = ((ColumnReferenceExpr) expr).getCanonicalName();
+    } else { // if a generated column name
+      generatedName = attachSeqIdToGeneratedColumnName(getGeneratedPrefixFromExpr(expr));
+    }
+    return generatedName;
+  }
+
+  /**
+   * It attaches a generated column name with a sequence id. It always keeps generated names unique.
+   */
+  private String attachSeqIdToGeneratedColumnName(String prefix) {
+    int sequence = noNameColumnId++;
+    return NONAMED_COLUMN_PREFIX + prefix.toLowerCase() + (sequence > 0 ? "_" + sequence : "");
+  }
+
+  /**
+   * It generates a column reference prefix name. It is usually used for an expression or predicate without
+   * a specified name (i.e., alias). For example, a predicate in WHERE does not have any alias name.
+   * It just returns a prefix name. In other words, it always returns the same prefix for the same type of expressions.
+   * So, you must add some suffix to the returned name in order to distinguish reference names.
+   */
+  private static String getGeneratedPrefixFromExpr(Expr expr) {
+    String prefix;
+
+    switch (expr.getType()) {
+    case Column:
+      prefix = ((ColumnReferenceExpr) expr).getCanonicalName();
+      break;
+    case CountRowsFunction:
+      prefix = "count";
+      break;
+    case GeneralSetFunction:
+      GeneralSetFunctionExpr setFunction = (GeneralSetFunctionExpr) expr;
+      prefix = setFunction.getSignature();
+      break;
+    case Function:
+      FunctionExpr function = (FunctionExpr) expr;
+      prefix = function.getSignature();
+      break;
+    default:
+      prefix = expr.getType().name();
+    }
+    return prefix;
+  }
+
+  public QueryBlock getRootBlock() {
+    return queryBlocks.get(ROOT_BLOCK);
+  }
+
+  public QueryBlock getBlock(String blockName) {
+    return queryBlocks.get(blockName);
+  }
+
+  public QueryBlock getBlock(LogicalNode node) {
+    return queryBlockByPID.get(node.getPID());
+  }
+
+  public void removeBlock(QueryBlock block) {
+    queryBlocks.remove(block.getName());
+    List<Integer> tobeRemoved = new ArrayList<Integer>();
+    for (Map.Entry<Integer, QueryBlock> entry : queryBlockByPID.entrySet()) {
+      tobeRemoved.add(entry.getKey());
+    }
+    for (Integer rn : tobeRemoved) {
+      queryBlockByPID.remove(rn);
+    }
+  }
+
+  public void connectBlocks(QueryBlock srcBlock, QueryBlock targetBlock, BlockType type) {
+    queryBlockGraph.addEdge(srcBlock.getName(), targetBlock.getName(), new BlockEdge(srcBlock, targetBlock, type));
+  }
+
+  public QueryBlock getParentBlock(QueryBlock block) {
+    return queryBlocks.get(queryBlockGraph.getParent(block.getName(), 0));
+  }
+
+  public List<QueryBlock> getChildBlocks(QueryBlock block) {
+    List<QueryBlock> childBlocks = TUtil.newList();
+    for (String blockName : queryBlockGraph.getChilds(block.getName())) {
+      childBlocks.add(queryBlocks.get(blockName));
+    }
+    return childBlocks;
+  }
+
+  public void mapExprToBlock(Expr expr, String blockName) {
+    exprToBlockNameMap.put(ObjectUtils.identityToString(expr), blockName);
+  }
+
+  public QueryBlock getBlockByExpr(Expr expr) {
+    return getBlock(exprToBlockNameMap.get(ObjectUtils.identityToString(expr)));
+  }
+
+  public String getBlockNameByExpr(Expr expr) {
+    return exprToBlockNameMap.get(ObjectUtils.identityToString(expr));
+  }
+
+  public Collection<QueryBlock> getQueryBlocks() {
+    return queryBlocks.values();
+  }
+
+  public SimpleDirectedGraph<String, BlockEdge> getQueryBlockGraph() {
+    return queryBlockGraph;
+  }
+
+  public String getNormalizedColumnName(QueryBlock block, ColumnReferenceExpr columnRef)
+      throws PlanningException {
+    Column found = resolveColumn(block, columnRef);
+    if (found == null) {
+      throw new NoSuchColumnException(columnRef.getCanonicalName());
+    }
+    return found.getQualifiedName();
+  }
+
+  public String resolveDatabase(QueryBlock block, String tableName) throws PlanningException {
+    List<String> found = new ArrayList<String>();
+    for (RelationNode relation : block.getRelations()) {
+      // check alias name or table name
+      if (CatalogUtil.extractSimpleName(relation.getCanonicalName()).equals(tableName) ||
+          CatalogUtil.extractSimpleName(relation.getTableName()).equals(tableName)) {
+        // obtain the database name
+        found.add(CatalogUtil.extractQualifier(relation.getTableName()));
+      }
+    }
+
+    if (found.size() == 0) {
+      return null;
+    } else if (found.size() > 1) {
+      throw new PlanningException("Ambiguous table name \"" + tableName + "\"");
+    }
+
+    return found.get(0);
+  }
+
+  /**
+   * It resolves a column.
+   */
+  public Column resolveColumn(QueryBlock block, ColumnReferenceExpr columnRef) throws PlanningException {
+
+    if (columnRef.hasQualifier()) { // if a column reference is qualified
+
+      String qualifier;
+      String canonicalName;
+      String qualifiedName;
+
+      if (CatalogUtil.isFQTableName(columnRef.getQualifier())) {
+        qualifier = columnRef.getQualifier();
+        canonicalName = columnRef.getCanonicalName();
+      } else {
+        String resolvedDatabaseName = resolveDatabase(block, columnRef.getQualifier());
+        if (resolvedDatabaseName == null) {
+          throw new NoSuchColumnException(columnRef.getQualifier());
+        }
+        qualifier = CatalogUtil.buildFQName(resolvedDatabaseName, columnRef.getQualifier());
+        canonicalName = CatalogUtil.buildFQName(qualifier, columnRef.getName());
+      }
+      qualifiedName = CatalogUtil.buildFQName(qualifier, columnRef.getName());
+
+      RelationNode relationOp = block.getRelation(qualifier);
+
+      // if a column name is outside of this query block
+      if (relationOp == null) {
+        // TODO - nested query can only refer outer query block? or not?
+        for (QueryBlock eachBlock : queryBlocks.values()) {
+          if (eachBlock.existsRelation(qualifier)) {
+            relationOp = eachBlock.getRelation(qualifier);
+          }
+        }
+      }
+
+      // If we cannot find any relation against a qualified column name
+      if (relationOp == null) {
+        throw new NoSuchColumnException(canonicalName);
+      }
+
+      if (block.isAlreadyRenamedTableName(CatalogUtil.extractQualifier(canonicalName))) {
+        String changedName = CatalogUtil.buildFQName(
+            relationOp.getCanonicalName(),
+            CatalogUtil.extractSimpleName(canonicalName));
+        canonicalName = changedName;
+      }
+
+      Schema schema = relationOp.getTableSchema();
+      Column column = schema.getColumn(canonicalName);
+      if (column == null) {
+        throw new NoSuchColumnException(canonicalName);
+      }
+
+      // If code reach here, a column is found.
+      // But, it may be aliased from bottom logical node.
+      // If the column is aliased, the found name may not be used in upper node.
+
+      // Here, we try to check if column reference is already aliased.
+      // If so, it replaces the name with aliased name.
+      LogicalNode currentNode = block.getCurrentNode();
+
+      // The condition (currentNode.getInSchema().contains(column)) means
+      // the column can be used at the current node. So, we don't need to find aliase name.
+      if (currentNode != null && !currentNode.getInSchema().contains(column)
+          && currentNode.getType() != NodeType.TABLE_SUBQUERY) {
+        List<Column> candidates = TUtil.newList();
+        if (block.namedExprsMgr.isAliased(qualifiedName)) {
+          String alias = block.namedExprsMgr.getAlias(canonicalName);
+          Column found = resolveColumn(block, new ColumnReferenceExpr(alias));
+          if (found != null) {
+            candidates.add(found);
+          }
+        }
+        if (!candidates.isEmpty()) {
+          return ensureUniqueColumn(candidates);
+        }
+      }
+
+      return column;
+    } else { // if a column reference is not qualified
+
+      // Trying to find the column within the current block
+
+      if (block.currentNode != null && block.currentNode.getInSchema() != null) {
+        Column found = block.currentNode.getInSchema().getColumn(columnRef.getCanonicalName());
+        if (found != null) {
+          return found;
+        }
+      }
+
+      if (block.getLatestNode() != null) {
+        Column found = block.getLatestNode().getOutSchema().getColumn(columnRef.getName());
+        if (found != null) {
+          return found;
+        }
+      }
+
+      List<Column> candidates = TUtil.newList();
+      // Trying to find columns from aliased references.
+      if (block.namedExprsMgr.isAliased(columnRef.getCanonicalName())) {
+        String originalName = block.namedExprsMgr.getAlias(columnRef.getCanonicalName());
+        Column found = resolveColumn(block, new ColumnReferenceExpr(originalName));
+        if (found != null) {
+          candidates.add(found);
+        }
+      }
+      if (!candidates.isEmpty()) {
+        return ensureUniqueColumn(candidates);
+      }
+
+      // Trying to find columns from other relations in the current block
+      for (RelationNode rel : block.getRelations()) {
+        Column found = rel.getTableSchema().getColumn(columnRef.getName());
+        if (found != null) {
+          candidates.add(found);
+        }
+      }
+
+      if (!candidates.isEmpty()) {
+        return ensureUniqueColumn(candidates);
+      }
+
+      // Trying to find columns from other relations in other blocks
+      for (QueryBlock eachBlock : queryBlocks.values()) {
+        for (RelationNode rel : eachBlock.getRelations()) {
+          Column found = rel.getTableSchema().getColumn(columnRef.getName());
+          if (found != null) {
+            candidates.add(found);
+          }
+        }
+      }
+
+      if (!candidates.isEmpty()) {
+        return ensureUniqueColumn(candidates);
+      }
+
+      // Trying to find columns from schema in current block.
+      if (block.getSchema() != null) {
+        Column found = block.getSchema().getColumn(columnRef.getName());
+        if (found != null) {
+          candidates.add(found);
+        }
+      }
+
+      if (!candidates.isEmpty()) {
+        return ensureUniqueColumn(candidates);
+      }
+
+      throw new VerifyException("ERROR: no such a column name "+ columnRef.getCanonicalName());
+    }
+  }
+
+  private static Column ensureUniqueColumn(List<Column> candidates)
+      throws VerifyException {
+    if (candidates.size() == 1) {
+      return candidates.get(0);
+    } else if (candidates.size() > 2) {
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (Column column : candidates) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(column);
+      }
+      throw new VerifyException("Ambiguous Column Name: " + sb.toString());
+    } else {
+      return null;
+    }
+  }
+
+  public String getQueryGraphAsString() {
+    StringBuilder sb = new StringBuilder();
+
+    sb.append("\n-----------------------------\n");
+    sb.append("Query Block Graph\n");
+    sb.append("-----------------------------\n");
+    sb.append(queryBlockGraph.toStringGraph(getRootBlock().getName()));
+    sb.append("-----------------------------\n");
+    sb.append("Optimization Log:\n");
+    DirectedGraphCursor<String, BlockEdge> cursor =
+        new DirectedGraphCursor<String, BlockEdge>(queryBlockGraph, getRootBlock().getName());
+    while(cursor.hasNext()) {
+      QueryBlock block = getBlock(cursor.nextBlock());
+      if (block.getPlanHistory().size() > 0) {
+        sb.append("\n[").append(block.getName()).append("]\n");
+        for (String log : block.getPlanHistory()) {
+          sb.append("> ").append(log).append("\n");
+        }
+      }
+    }
+    sb.append("-----------------------------\n");
+    sb.append("\n");
+
+    sb.append(getLogicalPlanAsString());
+
+    return sb.toString();
+  }
+
+  public String getLogicalPlanAsString() {
+    ExplainLogicalPlanVisitor explain = new ExplainLogicalPlanVisitor();
+
+    StringBuilder explains = new StringBuilder();
+    try {
+      ExplainLogicalPlanVisitor.Context explainContext = explain.getBlockPlanStrings(this, getRootBlock().getRoot());
+      while(!explainContext.explains.empty()) {
+        explains.append(
+            ExplainLogicalPlanVisitor.printDepthString(explainContext.getMaxDepth(), explainContext.explains.pop()));
+      }
+    } catch (PlanningException e) {
+      throw new RuntimeException(e);
+    }
+
+    return explains.toString();
+  }
+
+  public void addHistory(String string) {
+    planingHistory.add(string);
+  }
+
+  public List<String> getHistory() {
+    return planingHistory;
+  }
+
+  @Override
+  public String toString() {
+    return getQueryGraphAsString();
+  }
+
+  ///////////////////////////////////////////////////////////////////////////
+  //                             Query Block
+  ///////////////////////////////////////////////////////////////////////////
+
+  public static enum BlockType {
+    TableSubQuery,
+    ScalarSubQuery
+  }
+
+  public static class BlockEdge {
+    private String childName;
+    private String parentName;
+    private BlockType blockType;
+
+
+    public BlockEdge(String childName, String parentName, BlockType blockType) {
+      this.childName = childName;
+      this.parentName = parentName;
+      this.blockType = blockType;
+    }
+
+    public BlockEdge(QueryBlock child, QueryBlock parent, BlockType blockType) {
+      this(child.getName(), parent.getName(), blockType);
+    }
+
+    public String getParentName() {
+      return parentName;
+    }
+
+    public String getChildName() {
+      return childName;
+    }
+
+    public BlockType getBlockType() {
+      return blockType;
+    }
+  }
+
+  public class QueryBlock {
+    private final String blockName;
+    private LogicalNode rootNode;
+    private NodeType rootType;
+
+    // transient states
+    private final Map<String, RelationNode> canonicalNameToRelationMap = TUtil.newHashMap();
+    private final Map<String, List<String>> aliasMap = TUtil.newHashMap();
+    private final Map<OpType, List<Expr>> operatorToExprMap = TUtil.newHashMap();
+    /**
+     * It's a map between nodetype and node. node types can be duplicated. So, latest node type is only kept.
+     */
+    private final Map<NodeType, LogicalNode> nodeTypeToNodeMap = TUtil.newHashMap();
+    private final Map<String, LogicalNode> exprToNodeMap = TUtil.newHashMap();
+    final NamedExprsManager namedExprsMgr;
+
+    private LogicalNode currentNode;
+    private LogicalNode latestNode;
+    private final Set<JoinType> includedJoinTypes = TUtil.newHashSet();
+    /**
+     * Set true value if this query block has either implicit or explicit aggregation.
+     */
+    private boolean aggregationRequired = false;
+    private Schema schema;
+
+    /** It contains a planning log for this block */
+    private final List<String> planingHistory = Lists.newArrayList();
+    /** It is for debugging or unit tests */
+    private Target [] rawTargets;
+
+    public QueryBlock(String blockName) {
+      this.blockName = blockName;
+      this.namedExprsMgr = new NamedExprsManager(LogicalPlan.this);
+    }
+
+    public String getName() {
+      return blockName;
+    }
+
+    public void refresh() {
+      setRoot(rootNode);
+    }
+
+    public void setRoot(LogicalNode blockRoot) {
+      this.rootNode = blockRoot;
+      if (blockRoot instanceof LogicalRootNode) {
+        LogicalRootNode rootNode = (LogicalRootNode) blockRoot;
+        rootType = rootNode.getChild().getType();
+      }
+    }
+
+    public <NODE extends LogicalNode> NODE getRoot() {
+      return (NODE) rootNode;
+    }
+
+    public NodeType getRootType() {
+      return rootType;
+    }
+
+    public Target [] getRawTargets() {
+      return rawTargets;
+    }
+
+    public void setRawTargets(Target[] rawTargets) {
+      this.rawTargets = rawTargets;
+    }
+
+    public boolean existsRelation(String name) {
+      return canonicalNameToRelationMap.containsKey(name);
+    }
+
+    public boolean isAlreadyRenamedTableName(String name) {
+      return aliasMap.containsKey(name);
+    }
+
+    public RelationNode getRelation(String name) {
+      if (canonicalNameToRelationMap.containsKey(name)) {
+        return canonicalNameToRelationMap.get(name);
+      }
+
+      if (aliasMap.containsKey(name)) {
+        return canonicalNameToRelationMap.get(aliasMap.get(name).get(0));
+      }
+
+      return null;
+    }
+
+    public void addRelation(RelationNode relation) {
+      if (relation.hasAlias()) {
+        TUtil.putToNestedList(aliasMap, relation.getTableName(), relation.getCanonicalName());
+      }
+      canonicalNameToRelationMap.put(relation.getCanonicalName(), relation);
+    }
+
+    public Collection<RelationNode> getRelations() {
+      return this.canonicalNameToRelationMap.values();
+    }
+
+    public boolean hasTableExpression() {
+      return this.canonicalNameToRelationMap.size() > 0;
+    }
+
+    public void setSchema(Schema schema) {
+      this.schema = schema;
+    }
+
+    public Schema getSchema() {
+      return schema;
+    }
+
+    public NamedExprsManager getNamedExprsManager() {
+      return namedExprsMgr;
+    }
+
+    public void updateCurrentNode(Expr expr) throws PlanningException {
+
+      if (expr.getType() != OpType.RelationList) { // skip relation list because it is a virtual expr.
+        this.currentNode = exprToNodeMap.get(ObjectUtils.identityToString(expr));
+        if (currentNode == null) {
+          throw new PlanningException("Unregistered Algebra Expression: " + expr.getType());
+        }
+      }
+    }
+
+    public <T extends LogicalNode> T getCurrentNode() {
+      return (T) this.currentNode;
+    }
+
+    public void updateLatestNode(LogicalNode node) {
+      this.latestNode = node;
+    }
+
+    public <T extends LogicalNode> T getLatestNode() {
+      return (T) this.latestNode;
+    }
+
+    public void setAlgebraicExpr(Expr expr) {
+      TUtil.putToNestedList(operatorToExprMap, expr.getType(), expr);
+    }
+
+    public boolean hasAlgebraicExpr(OpType opType) {
+      return operatorToExprMap.containsKey(opType);
+    }
+
+    public <T extends Expr> List<T> getAlgebraicExpr(OpType opType) {
+      return (List<T>) operatorToExprMap.get(opType);
+    }
+
+    public <T extends Expr> T getSingletonExpr(OpType opType) {
+      if (hasAlgebraicExpr(opType)) {
+        return (T) operatorToExprMap.get(opType).get(0);
+      } else {
+        return null;
+      }
+    }
+
+    public boolean hasNode(NodeType nodeType) {
+      return nodeTypeToNodeMap.containsKey(nodeType);
+    }
+
+    public void registerNode(LogicalNode node) {
+      // id -> node
+      nodeMap.put(node.getPID(), node);
+
+      // So, this is only for filter, groupby, sort, limit, projection, which exists once at a query block.
+      nodeTypeToNodeMap.put(node.getType(), node);
+
+      queryBlockByPID.put(node.getPID(), this);
+    }
+
+    public <T extends LogicalNode> T getNode(NodeType nodeType) {
+      return (T) nodeTypeToNodeMap.get(nodeType);
+    }
+
+    // expr -> node
+    public void registerExprWithNode(Expr expr, LogicalNode node) {
+      exprToNodeMap.put(ObjectUtils.identityToString(expr), node);
+    }
+
+    public <T extends LogicalNode> T getNodeFromExpr(Expr expr) {
+      return (T) exprToNodeMap.get(ObjectUtils.identityToString(expr));
+    }
+
+    /**
+     * This flag can be changed as a plan is generated.
+     *
+     * True value means that this query should have aggregation phase. If aggregation plan is added to this block,
+     * it becomes false because it doesn't need aggregation phase anymore. It is usually used to add aggregation
+     * phase from SELECT statement without group-by clause.
+     *
+     * @return True if aggregation is needed but this query hasn't had aggregation phase.
+     */
+    public boolean isAggregationRequired() {
+      return this.aggregationRequired;
+    }
+
+    /**
+     * Unset aggregation required flag. It has to be called after an aggregation phase is added to this block.
+     */
+    public void unsetAggregationRequire() {
+      this.aggregationRequired = false;
+    }
+
+    public void setAggregationRequire() {
+      aggregationRequired = true;
+    }
+
+    public boolean containsJoinType(JoinType joinType) {
+      return includedJoinTypes.contains(joinType);
+    }
+
+    public void addJoinType(JoinType joinType) {
+      includedJoinTypes.add(joinType);
+    }
+
+    public List<String> getPlanHistory() {
+      return planingHistory;
+    }
+
+    public void addPlanHistory(String history) {
+      this.planingHistory.add(history);
+    }
+
+    public String toString() {
+      return blockName;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
new file mode 100644
index 0000000..56863f7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanPreprocessor.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalType;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.exception.NoSuchColumnException;
+import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * It finds all relations for each block and builds base schema information.
+ */
+class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanPreprocessor.PreprocessContext, LogicalNode> {
+  private ExprAnnotator annotator;
+
+  static class PreprocessContext {
+    Session session;
+    LogicalPlan plan;
+    LogicalPlan.QueryBlock currentBlock;
+
+    public PreprocessContext(Session session, LogicalPlan plan, LogicalPlan.QueryBlock currentBlock) {
+      this.session = session;
+      this.plan = plan;
+      this.currentBlock = currentBlock;
+    }
+
+    public PreprocessContext(PreprocessContext context, LogicalPlan.QueryBlock currentBlock) {
+      this.session = context.session;
+      this.plan = context.plan;
+      this.currentBlock = currentBlock;
+    }
+  }
+
+  /** Catalog service */
+  private CatalogService catalog;
+
+  LogicalPlanPreprocessor(CatalogService catalog, ExprAnnotator annotator) {
+    this.catalog = catalog;
+    this.annotator = annotator;
+  }
+
+  @Override
+  public void preHook(PreprocessContext ctx, Stack<Expr> stack, Expr expr) throws PlanningException {
+    ctx.currentBlock.setAlgebraicExpr(expr);
+    ctx.plan.mapExprToBlock(expr, ctx.currentBlock.getName());
+  }
+
+  @Override
+  public LogicalNode postHook(PreprocessContext ctx, Stack<Expr> stack, Expr expr, LogicalNode result) throws PlanningException {
+    // If non-from statement, result can be null. It avoids that case.
+    if (result != null) {
+      // setNode method registers each node to corresponding block and plan.
+      ctx.currentBlock.registerNode(result);
+      // It makes a map between an expr and a logical node.
+      ctx.currentBlock.registerExprWithNode(expr, result);
+    }
+    return result;
+  }
+
+  /**
+   * Get all columns of the relations correspondent to the asterisk expression.
+   * @param ctx
+   * @param asteriskExpr
+   * @return array of columns
+   * @throws PlanningException
+   */
+  public static Column[] getColumns(PreprocessContext ctx, QualifiedAsteriskExpr asteriskExpr)
+      throws PlanningException {
+    RelationNode relationOp = null;
+    QueryBlock block = ctx.currentBlock;
+    Collection<QueryBlock> queryBlocks = ctx.plan.getQueryBlocks();
+    if (asteriskExpr.hasQualifier()) {
+      String qualifier;
+
+      if (CatalogUtil.isFQTableName(asteriskExpr.getQualifier())) {
+        qualifier = asteriskExpr.getQualifier();
+      } else {
+        qualifier = CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), asteriskExpr.getQualifier());
+      }
+
+      relationOp = block.getRelation(qualifier);
+
+      // if a column name is outside of this query block
+      if (relationOp == null) {
+        // TODO - nested query can only refer outer query block? or not?
+        for (QueryBlock eachBlock : queryBlocks) {
+          if (eachBlock.existsRelation(qualifier)) {
+            relationOp = eachBlock.getRelation(qualifier);
+          }
+        }
+      }
+
+      // If we cannot find any relation against a qualified column name
+      if (relationOp == null) {
+        throw new NoSuchColumnException(CatalogUtil.buildFQName(qualifier, "*"));
+      }
+
+      Schema schema = relationOp.getTableSchema();
+      Column[] resolvedColumns = new Column[schema.size()];
+      return schema.getColumns().toArray(resolvedColumns);
+    } else { // if a column reference is not qualified
+      // columns of every relation should be resolved.
+      Iterator<RelationNode> iterator = block.getRelations().iterator();
+      Schema schema;
+      List<Column> resolvedColumns = TUtil.newList();
+
+      while (iterator.hasNext()) {
+        relationOp = iterator.next();
+        schema = relationOp.getTableSchema();
+        resolvedColumns.addAll(schema.getColumns());
+      }
+
+      if (resolvedColumns.size() == 0) {
+        throw new NoSuchColumnException(asteriskExpr.toString());
+      }
+
+      return resolvedColumns.toArray(new Column[resolvedColumns.size()]);
+    }
+  }
+
+  /**
+   * Resolve an asterisk expression to the real column reference expressions.
+   * @param ctx context
+   * @param asteriskExpr asterisk expression
+   * @return a list of NamedExpr each of which has ColumnReferenceExprs as its child
+   * @throws PlanningException
+   */
+  private static List<NamedExpr> resolveAsterisk(PreprocessContext ctx, QualifiedAsteriskExpr asteriskExpr)
+      throws PlanningException {
+    Column[] columns = getColumns(ctx, asteriskExpr);
+    List<NamedExpr> newTargetExprs = new ArrayList<NamedExpr>(columns.length);
+    int i;
+    for (i = 0; i < columns.length; i++) {
+      newTargetExprs.add(new NamedExpr(new ColumnReferenceExpr(columns[i].getQualifier(), columns[i].getSimpleName())));
+    }
+    return newTargetExprs;
+  }
+
+  private static boolean hasAsterisk(Projection projection) {
+    for (NamedExpr eachTarget : projection.getNamedExprs()) {
+      if (eachTarget.getExpr().getType() == OpType.Asterisk) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalNode visitProjection(PreprocessContext ctx, Stack<Expr> stack, Projection expr) throws PlanningException {
+    // If Non-from statement, it immediately returns.
+    if (!expr.hasChild()) {
+      return ctx.plan.createNode(EvalExprNode.class);
+    }
+
+    stack.push(expr); // <--- push
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+
+    // Resolve the asterisk expression
+    if (hasAsterisk(expr)) {
+      List<NamedExpr> rewrittenTargets = TUtil.newList();
+      for (NamedExpr originTarget : expr.getNamedExprs()) {
+        if (originTarget.getExpr().getType() == OpType.Asterisk) {
+          // rewrite targets
+          rewrittenTargets.addAll(resolveAsterisk(ctx, (QualifiedAsteriskExpr) originTarget.getExpr()));
+        } else {
+          rewrittenTargets.add(originTarget);
+        }
+      }
+      expr.setNamedExprs(rewrittenTargets.toArray(new NamedExpr[rewrittenTargets.size()]));
+    }
+
+    NamedExpr[] projectTargetExprs = expr.getNamedExprs();
+
+    Target [] targets;
+    targets = new Target[projectTargetExprs.length];
+
+    for (int i = 0; i < expr.getNamedExprs().length; i++) {
+      NamedExpr namedExpr = expr.getNamedExprs()[i];
+      EvalNode evalNode = annotator.createEvalNode(ctx.plan, ctx.currentBlock, namedExpr.getExpr());
+
+      if (namedExpr.hasAlias()) {
+        targets[i] = new Target(evalNode, namedExpr.getAlias());
+      } else if (evalNode.getType() == EvalType.FIELD) {
+        targets[i] = new Target((FieldEval) evalNode);
+      } else {
+        String generatedName = ctx.plan.generateUniqueColumnName(namedExpr.getExpr());
+        targets[i] = new Target(evalNode, generatedName);
+        namedExpr.setAlias(generatedName);
+      }
+    }
+    stack.pop(); // <--- Pop
+
+    ProjectionNode projectionNode = ctx.plan.createNode(ProjectionNode.class);
+    projectionNode.setInSchema(child.getOutSchema());
+    projectionNode.setOutSchema(PlannerUtil.targetToSchema(targets));
+    return projectionNode;
+  }
+
+  @Override
+  public LogicalNode visitLimit(PreprocessContext ctx, Stack<Expr> stack, Limit expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    LimitNode limitNode = ctx.plan.createNode(LimitNode.class);
+    limitNode.setInSchema(child.getOutSchema());
+    limitNode.setOutSchema(child.getOutSchema());
+    return limitNode;
+  }
+
+  @Override
+  public LogicalNode visitSort(PreprocessContext ctx, Stack<Expr> stack, Sort expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    SortNode sortNode = ctx.plan.createNode(SortNode.class);
+    sortNode.setInSchema(child.getOutSchema());
+    sortNode.setOutSchema(child.getOutSchema());
+    return sortNode;
+  }
+
+  @Override
+  public LogicalNode visitHaving(PreprocessContext ctx, Stack<Expr> stack, Having expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    HavingNode havingNode = ctx.plan.createNode(HavingNode.class);
+    havingNode.setInSchema(child.getOutSchema());
+    havingNode.setOutSchema(child.getOutSchema());
+    return havingNode;
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(PreprocessContext ctx, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+    stack.push(expr); // <--- push
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+
+    Projection projection = ctx.currentBlock.getSingletonExpr(OpType.Projection);
+    int finalTargetNum = projection.getNamedExprs().length;
+    Target [] targets = new Target[finalTargetNum];
+
+    for (int i = 0; i < finalTargetNum; i++) {
+      NamedExpr namedExpr = projection.getNamedExprs()[i];
+      EvalNode evalNode = annotator.createEvalNode(ctx.plan, ctx.currentBlock, namedExpr.getExpr());
+
+      if (namedExpr.hasAlias()) {
+        targets[i] = new Target(evalNode, namedExpr.getAlias());
+      } else {
+        targets[i] = new Target(evalNode, "?name_" + i);
+      }
+    }
+    stack.pop();
+
+    GroupbyNode groupByNode = ctx.plan.createNode(GroupbyNode.class);
+    groupByNode.setInSchema(child.getOutSchema());
+    groupByNode.setOutSchema(PlannerUtil.targetToSchema(targets));
+    return groupByNode;
+  }
+
+  @Override
+  public LogicalNode visitUnion(PreprocessContext ctx, Stack<Expr> stack, SetOperation expr) throws PlanningException {
+    LogicalPlan.QueryBlock leftBlock = ctx.plan.newQueryBlock();
+    PreprocessContext leftContext = new PreprocessContext(ctx, leftBlock);
+    LogicalNode leftChild = visit(leftContext, new Stack<Expr>(), expr.getLeft());
+    ctx.currentBlock.registerExprWithNode(expr.getLeft(), leftChild);
+
+    LogicalPlan.QueryBlock rightBlock = ctx.plan.newQueryBlock();
+    PreprocessContext rightContext = new PreprocessContext(ctx, rightBlock);
+    LogicalNode rightChild = visit(rightContext, new Stack<Expr>(), expr.getRight());
+    ctx.currentBlock.registerExprWithNode(expr.getRight(), rightChild);
+
+    UnionNode unionNode = new UnionNode(ctx.plan.newPID());
+    unionNode.setLeftChild(leftChild);
+    unionNode.setRightChild(rightChild);
+    unionNode.setInSchema(leftChild.getOutSchema());
+    unionNode.setOutSchema(leftChild.getOutSchema());
+
+    return unionNode;
+  }
+
+  public LogicalNode visitFilter(PreprocessContext ctx, Stack<Expr> stack, Selection expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode child = visit(ctx, stack, expr.getChild());
+    stack.pop();
+
+    SelectionNode selectionNode = ctx.plan.createNode(SelectionNode.class);
+    selectionNode.setInSchema(child.getOutSchema());
+    selectionNode.setOutSchema(child.getOutSchema());
+    return selectionNode;
+  }
+
+  @Override
+  public LogicalNode visitJoin(PreprocessContext ctx, Stack<Expr> stack, Join expr) throws PlanningException {
+    stack.push(expr);
+    LogicalNode left = visit(ctx, stack, expr.getLeft());
+    LogicalNode right = visit(ctx, stack, expr.getRight());
+    stack.pop();
+    JoinNode joinNode = ctx.plan.createNode(JoinNode.class);
+    joinNode.setJoinType(expr.getJoinType());
+    Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+    joinNode.setInSchema(merged);
+    joinNode.setOutSchema(merged);
+
+    ctx.currentBlock.addJoinType(expr.getJoinType());
+    return joinNode;
+  }
+
+  @Override
+  public LogicalNode visitRelation(PreprocessContext ctx, Stack<Expr> stack, Relation expr)
+      throws PlanningException {
+    Relation relation = expr;
+
+    String actualRelationName;
+    if (CatalogUtil.isFQTableName(expr.getName())) {
+      actualRelationName = relation.getName();
+    } else {
+      actualRelationName = CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), relation.getName());
+    }
+
+    TableDesc desc = catalog.getTableDesc(actualRelationName);
+    ScanNode scanNode = ctx.plan.createNode(ScanNode.class);
+    if (relation.hasAlias()) {
+      scanNode.init(desc, relation.getAlias());
+    } else {
+      scanNode.init(desc);
+    }
+    ctx.currentBlock.addRelation(scanNode);
+
+    return scanNode;
+  }
+
+  @Override
+  public LogicalNode visitTableSubQuery(PreprocessContext ctx, Stack<Expr> stack, TablePrimarySubQuery expr)
+      throws PlanningException {
+
+    PreprocessContext newContext;
+    // Note: TableSubQuery always has a table name.
+    // SELECT .... FROM (SELECT ...) TB_NAME <-
+    newContext = new PreprocessContext(ctx, ctx.plan.newQueryBlock());
+    LogicalNode child = super.visitTableSubQuery(newContext, stack, expr);
+
+    // a table subquery should be dealt as a relation.
+    TableSubQueryNode node = ctx.plan.createNode(TableSubQueryNode.class);
+    node.init(CatalogUtil.buildFQName(ctx.session.getCurrentDatabase(), expr.getName()), child);
+    ctx.currentBlock.addRelation(node);
+    return node;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Data Definition Language Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  @Override
+  public LogicalNode visitCreateDatabase(PreprocessContext ctx, Stack<Expr> stack, CreateDatabase expr)
+      throws PlanningException {
+    CreateDatabaseNode createDatabaseNode = ctx.plan.createNode(CreateDatabaseNode.class);
+    return createDatabaseNode;
+  }
+
+  @Override
+  public LogicalNode visitDropDatabase(PreprocessContext ctx, Stack<Expr> stack, DropDatabase expr)
+      throws PlanningException {
+    DropDatabaseNode dropDatabaseNode = ctx.plan.createNode(DropDatabaseNode.class);
+    return dropDatabaseNode;
+  }
+
+  @Override
+  public LogicalNode visitCreateTable(PreprocessContext ctx, Stack<Expr> stack, CreateTable expr)
+      throws PlanningException {
+
+    CreateTableNode createTableNode = ctx.plan.createNode(CreateTableNode.class);
+
+    if (expr.hasSubQuery()) {
+      stack.push(expr);
+      visit(ctx, stack, expr.getSubQuery());
+      stack.pop();
+    }
+
+    return createTableNode;
+  }
+
+  @Override
+  public LogicalNode visitDropTable(PreprocessContext ctx, Stack<Expr> stack, DropTable expr)
+      throws PlanningException {
+    DropTableNode dropTable = ctx.plan.createNode(DropTableNode.class);
+    return dropTable;
+  }
+
+  @Override
+  public LogicalNode visitAlterTablespace(PreprocessContext ctx, Stack<Expr> stack, AlterTablespace expr)
+      throws PlanningException {
+    AlterTablespaceNode alterTablespace = ctx.plan.createNode(AlterTablespaceNode.class);
+    return alterTablespace;
+  }
+
+  @Override
+  public LogicalNode visitAlterTable(PreprocessContext ctx, Stack<Expr> stack, AlterTable expr)
+      throws PlanningException {
+    AlterTableNode alterTableNode = ctx.plan.createNode(AlterTableNode.class);
+    return alterTableNode;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Insert or Update Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public LogicalNode visitInsert(PreprocessContext ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
+    LogicalNode child = super.visitInsert(ctx, stack, expr);
+
+    InsertNode insertNode = new InsertNode(ctx.plan.newPID());
+    insertNode.setInSchema(child.getOutSchema());
+    insertNode.setOutSchema(child.getOutSchema());
+    return insertNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
new file mode 100644
index 0000000..bb8192f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.master.session.Session;
+
+import java.util.Stack;
+
+public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<LogicalPlanVerifier.Context, LogicalNode> {
+  private TajoConf conf;
+  private CatalogService catalog;
+
+  public LogicalPlanVerifier(TajoConf conf, CatalogService catalog) {
+    this.conf = conf;
+    this.catalog = catalog;
+  }
+
+  public static class Context {
+    Session session;
+    VerificationState state;
+
+    public Context(Session session, VerificationState state) {
+      this.session = session;
+      this.state = state;
+    }
+  }
+
+  public VerificationState verify(Session session, VerificationState state, LogicalPlan plan) throws PlanningException {
+    Context context = new Context(session, state);
+    visit(context, plan, plan.getRootBlock());
+    return context.state;
+  }
+
+  /**
+   * It checks if an output schema of a projectable node and target's output data types are equivalent to each other.
+   */
+  private static void verifyProjectableOutputSchema(Projectable node) throws PlanningException {
+
+    Schema outputSchema = node.getOutSchema();
+    Schema targetSchema = PlannerUtil.targetToSchema(node.getTargets());
+
+    if (outputSchema.size() != node.getTargets().length) {
+      throw new PlanningException(String.format("Output schema and Target's schema are mismatched at Node (%d)",
+          + node.getPID()));
+    }
+
+    for (int i = 0; i < outputSchema.size(); i++) {
+      if (!outputSchema.getColumn(i).getDataType().equals(targetSchema.getColumn(i).getDataType())) {
+        Column targetColumn = targetSchema.getColumn(i);
+        Column insertColumn = outputSchema.getColumn(i);
+        throw new PlanningException("ERROR: " +
+            insertColumn.getSimpleName() + " is of type " + insertColumn.getDataType().getType().name() +
+            ", but target column '" + targetColumn.getSimpleName() + "' is of type " +
+            targetColumn.getDataType().getType().name());
+      }
+    }
+  }
+
+  @Override
+  public LogicalNode visitProjection(Context state, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitProjection(state, plan, block, node, stack);
+
+    for (Target target : node.getTargets()) {
+      ExprsVerifier.verify(state.state, node, target.getEvalTree());
+    }
+
+    verifyProjectableOutputSchema(node);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitLimit(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                LimitNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitLimit(context, plan, block, node, stack);
+
+    if (node.getFetchFirstNum() < 0) {
+      context.state.addVerification("LIMIT must not be negative");
+    }
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                  GroupbyNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitGroupBy(context, plan, block, node, stack);
+
+    verifyProjectableOutputSchema(node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    visit(context, plan, block, node.getChild(), stack);
+    ExprsVerifier.verify(context.state, node, node.getQual());
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    visit(context, plan, block, node.getLeftChild(), stack);
+    visit(context, plan, block, node.getRightChild(), stack);
+
+    if (node.hasJoinQual()) {
+      ExprsVerifier.verify(context.state, node, node.getJoinQual());
+    }
+
+    verifyProjectableOutputSchema(node);
+
+    return node;
+  }
+
+  private void verifySetStatement(VerificationState state, BinaryNode setNode) {
+    Preconditions.checkArgument(setNode.getType() == NodeType.UNION || setNode.getType() == NodeType.INTERSECT ||
+      setNode.getType() == NodeType.EXCEPT);
+    Schema left = setNode.getLeftChild().getOutSchema();
+    Schema right = setNode.getRightChild().getOutSchema();
+    NodeType type = setNode.getType();
+
+    if (left.size() != right.size()) {
+      state.addVerification("each " + type.name() + " query must have the same number of columns");
+      return;
+    }
+
+    Column[] leftColumns = left.toArray();
+    Column[] rightColumns = right.toArray();
+
+    for (int i = 0; i < leftColumns.length; i++) {
+      if (!leftColumns[i].getDataType().equals(rightColumns[i].getDataType())) {
+        state.addVerification(type + " types " + leftColumns[i].getDataType().getType() + " and "
+            + rightColumns[i].getDataType().getType() + " cannot be matched");
+      }
+    }
+  }
+
+  @Override
+  public LogicalNode visitUnion(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                UnionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitUnion(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitExcept(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 ExceptNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitExcept(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitIntersect(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    IntersectNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitIntersect(context, plan, block, node, stack);
+    verifySetStatement(context.state, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    if (node.hasTargets()) {
+      for (Target target : node.getTargets()) {
+        ExprsVerifier.verify(context.state, node, target.getEvalTree());
+      }
+    }
+
+    if (node.hasQual()) {
+      ExprsVerifier.verify(context.state, node, node.getQual());
+    }
+
+    verifyProjectableOutputSchema(node);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitStoreTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     StoreTableNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitStoreTable(context, plan, block, node, stack);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 InsertNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitInsert(context, plan, block, node, stack);
+    return node;
+  }
+
+  /**
+   * This ensures that corresponding columns in both tables are equivalent to each other.
+   */
+  private static void ensureDomains(VerificationState state, Schema targetTableScheme, Schema schema)
+      throws PlanningException {
+    for (int i = 0; i < schema.size(); i++) {
+      if (!schema.getColumn(i).getDataType().equals(targetTableScheme.getColumn(i).getDataType())) {
+        Column targetColumn = targetTableScheme.getColumn(i);
+        Column insertColumn = schema.getColumn(i);
+        state.addVerification("ERROR: " +
+            insertColumn.getSimpleName() + " is of type " + insertColumn.getDataType().getType().name() +
+            ", but target column '" + targetColumn.getSimpleName() + "' is of type " +
+            targetColumn.getDataType().getType().name());
+      }
+    }
+  }
+
+  @Override
+  public LogicalNode visitCreateTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                      CreateTableNode node, Stack<LogicalNode> stack) throws PlanningException {
+    super.visitCreateTable(context, plan, block, node, stack);
+    // here, we don't need check table existence because this check is performed in PreLogicalPlanVerifier.
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitDropTable(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    DropTableNode node, Stack<LogicalNode> stack) {
+    // here, we don't need check table existence because this check is performed in PreLogicalPlanVerifier.
+    return node;
+  }
+}


Mime
View raw message