tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [06/12] TAJO-501: Rewrite the projection part of logical planning.
Date Fri, 17 Jan 2014 09:23:36 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8e1f989a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index be24112..e27cc04 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.engine.planner;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -34,66 +34,59 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.TimestampDatum;
-import org.apache.tajo.datum.TimeDatum;
-import org.apache.tajo.datum.DateDatum;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
-import org.apache.tajo.engine.exception.UndefinedFunctionException;
 import org.apache.tajo.engine.exception.VerifyException;
-import org.apache.tajo.engine.function.AggFunction;
-import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.SchemaUtil;
-import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.util.TUtil;
-import org.joda.time.DateTime;
 
-import java.util.Collection;
-import java.util.List;
-import java.util.Stack;
+import java.util.*;
 
 import static org.apache.tajo.algebra.Aggregation.GroupType;
 import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
 import static org.apache.tajo.algebra.CreateTable.PartitionType;
-
-import org.apache.tajo.algebra.DateValue;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
+import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
 import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
+import static org.apache.tajo.engine.planner.LogicalPlanPreprocessor.PreprocessContext;
 
 /**
- * This class creates a logical plan from a parse tree ({@link org.apache.tajo.engine.parser.SQLAnalyzer})
- * generated by {@link org.apache.tajo.engine.parser.SQLAnalyzer}.
- *
- * Relational operators can be divided into two categories as follows:
- * <oi>
- *  <li>General operator: this type operators do not affect the tuple schema.
- *  Selection, Sort, and Limit belong to this type.</li>
- *  <li>Projectable operator: this type operators affects the tuple schema.
- *  Scan, Groupby, and Join belong to this type.
- *  </li>
- * </oi>
+ * This class creates a logical plan from a nested tajo algebra expression ({@link org.apache.tajo.algebra})
  */
 public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContext, LogicalNode> {
   private static Log LOG = LogFactory.getLog(LogicalPlanner.class);
   private final CatalogService catalog;
+  private final LogicalPlanPreprocessor preprocessor;
+  private final ExprAnnotator exprAnnotator;
+  private final ExprNormalizer normalizer;
 
   public LogicalPlanner(CatalogService catalog) {
     this.catalog = catalog;
+    this.exprAnnotator = new ExprAnnotator(catalog);
+    this.preprocessor = new LogicalPlanPreprocessor(catalog, exprAnnotator);
+    this.normalizer = new ExprNormalizer();
   }
 
-  public static class PlanContext {
+  public class PlanContext {
     LogicalPlan plan;
-    QueryBlock block;
+
+    // transient data for each query block
+    QueryBlock queryBlock;
 
     public PlanContext(LogicalPlan plan, QueryBlock block) {
       this.plan = plan;
-      this.block = block;
+      this.queryBlock = block;
+    }
+
+    public PlanContext(PlanContext context, QueryBlock block) {
+      this.plan = context.plan;
+      this.queryBlock = block;
+    }
+
+    public String toString() {
+      return "block=" + queryBlock.getName() + ", relNum=" + queryBlock.getRelations().size() + ", "+
+          queryBlock.namedExprsMgr.toString();
     }
   }
 
@@ -106,328 +99,399 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   public LogicalPlan createPlan(Expr expr) throws PlanningException {
 
     LogicalPlan plan = new LogicalPlan(this);
-    LogicalNode subroot;
-
-    Stack<OpType> stack = new Stack<OpType>();
 
     QueryBlock rootBlock = plan.newAndGetBlock(LogicalPlan.ROOT_BLOCK);
-    PlanContext context = new PlanContext(plan, rootBlock);
-    subroot = visit(context, stack, expr);
+    PreprocessContext preProcessorCtx = new PreprocessContext(plan, rootBlock);
+    preprocessor.visit(preProcessorCtx, new Stack<Expr>(), expr);
+
+    PlanContext context = new PlanContext(plan, plan.getRootBlock());
+    LogicalNode topMostNode = this.visit(context, new Stack<Expr>(), expr);
 
+    // Add Root Node
     LogicalRootNode root = new LogicalRootNode(plan.newPID());
-    root.setInSchema(subroot.getOutSchema());
-    root.setOutSchema(subroot.getOutSchema());
-    root.setChild(subroot);
+    root.setInSchema(topMostNode.getOutSchema());
+    root.setOutSchema(topMostNode.getOutSchema());
+    root.setChild(topMostNode);
     plan.getRootBlock().setRoot(root);
 
     return plan;
   }
 
-  public void preHook(PlanContext context, Stack<OpType> stack, Expr expr) {
-    context.block = checkIfNewBlockOrGet(context.plan, context.block.getName());
-    context.block.setAlgebraicExpr(expr);
-  }
-
-  public LogicalNode postHook(PlanContext context, Stack<OpType> stack, Expr expr, LogicalNode current)
-      throws PlanningException {
-    // Post work
-    if ((expr.getType() == OpType.RelationList && ((RelationList) expr).size() == 1)
-        || expr.getType() == OpType.Having) {
-      return current;
-    }
-
-    // mark the node as the visited node and do post work for each operator
-    context.block.postVisit(current, stack);
-    if (current instanceof Projectable) {
-      // check and set evaluated targets and update in/out schemas
-      context.block.checkAndResolveTargets(current);
-    }
-
-    return current;
+  public ExprAnnotator getExprAnnotator() {
+    return this.exprAnnotator;
   }
 
-  /**
-   * It checks if the first node in this query block. If not, it creates and adds a new query block.
-   * In addition, it always returns the query block corresponding to the block name.
-   */
-  private QueryBlock checkIfNewBlockOrGet(LogicalPlan plan, String blockName) {
-    QueryBlock block = plan.getBlock(blockName);
-    if (block == null) {
-      return plan.newAndGetBlock(blockName);
-    } else {
-      return block;
-    }
+  public void preHook(PlanContext context, Stack<Expr> stack, Expr expr) throws PlanningException {
+    context.queryBlock.updateCurrentNode(expr);
   }
 
-  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<OpType> stack, TablePrimarySubQuery expr)
+  public LogicalNode postHook(PlanContext context, Stack<Expr> stack, Expr expr, LogicalNode current)
       throws PlanningException {
-    QueryBlock newBlock = context.plan.newAndGetBlock(expr.getName());
-    PlanContext newContext = new PlanContext(context.plan, newBlock);
-    Stack<OpType> newStack = new Stack<OpType>();
-    LogicalNode child = visit(newContext, newStack, expr.getSubQuery());
-    context.plan.connectBlocks(newContext.block, context.block, BlockType.TableSubQuery);
-    return new TableSubQueryNode(context.plan.newPID(), expr.getName(), child);
-  }
-
 
-  @Override
-  public ScanNode visitRelation(PlanContext context, Stack<OpType> stack, Relation expr)
-      throws VerifyException {
-    // 1. init phase
 
-    // 2. build child plans
-    // 3. build scan plan
-    Relation relation = expr;
-    TableDesc desc = catalog.getTableDesc(relation.getName());
-    if (!desc.hasStats()) {
-      updatePhysicalInfo(desc);
+    // This checking determines whether current logical node is already visited or not.
+    // generated logical nodes (e.g., implicit aggregation) without exprs will pass NULL as a expr parameter.
+    // We should skip them.
+    if (expr != null) {
+      // A relation list including a single ScanNode will return a ScanNode instance that already passed postHook.
+      // So, it skips the already-visited ScanNode instance.
+      if (expr.getType() == OpType.RelationList && current.getType() == NodeType.SCAN) {
+        return current;
+      }
     }
 
-    ScanNode scanNode;
-    if (relation.hasAlias()) {
-      scanNode = new ScanNode(context.plan.newPID(), desc, relation.getAlias());
-    } else {
-      scanNode = new ScanNode(context.plan.newPID(), desc);
+    QueryBlock queryBlock = context.queryBlock;
+    queryBlock.updateLatestNode(current);
+    if (current.getType() == NodeType.GROUP_BY) {
+      queryBlock.unsetAggregationRequire();
     }
 
-    return scanNode;
-  }
+    // if this node is the topmost
+    if (stack.size() == 0) {
+      queryBlock.setRoot(current);
+    }
 
-  private void updatePhysicalInfo(TableDesc desc) {
-    if (desc.getPath() != null) {
-      try {
-        FileSystem fs = desc.getPath().getFileSystem(new Configuration());
-        FileStatus status = fs.getFileStatus(desc.getPath());
-        if (desc.getStats() != null && (status.isDirectory() || status.isFile())) {
-          ContentSummary summary = fs.getContentSummary(desc.getPath());
-          if (summary != null) {
-            long volume = summary.getLength();
-            desc.getStats().setNumBytes(volume);
-          }
-        }
-      } catch (Throwable t) {
-        LOG.warn(t);
-      }
+    if (!stack.empty()) {
+      queryBlock.updateCurrentNode(stack.peek());
     }
+    return current;
   }
 
   /*===============================================================================================
-    JOIN SECTION
+    Data Manupulation Language (DML) SECTION
+   ===============================================================================================*/
+
+
+  /*===============================================================================================
+    PROJECTION SECTION
    ===============================================================================================*/
   @Override
-  public LogicalNode visitRelationList(PlanContext context, Stack<OpType> stack, RelationList relations)
+  public LogicalNode visitProjection(PlanContext context, Stack<Expr> stack, Projection projection)
       throws PlanningException {
 
-    LogicalNode current = visit(context, stack, relations.getRelations()[0]);
 
-    LogicalNode left;
-    LogicalNode right;
-    if (relations.size() > 1) {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
 
-      for (int i = 1; i < relations.size(); i++) {
-        left = current;
-        right = visit(context, stack, relations.getRelations()[i]);
-        current = createCatasianProduct(context.plan, left, right);
-      }
+    // If a non-from statement is given
+    if (!projection.hasChild()) {
+      return buildPlanForNoneFromStatement(context, stack, projection);
     }
 
-    return current;
-  }
-
-  @Override
-  public LogicalNode visitJoin(PlanContext context, Stack<OpType> stack, Join join)
-      throws PlanningException {
-    // Phase 1: Init
-    LogicalPlan plan = context.plan;
-    QueryBlock block = context.block;
+    String [] referenceNames;
+    if (projection.isAllProjected()) {
+      referenceNames = null;
+    } else {
+      // in prephase, insert all target list into NamedExprManagers.
+      // Then it gets reference names, each of which points an expression in target list.
+      referenceNames = doProjectionPrephase(context, projection);
+    }
 
-    // Phase 2: build child plans
-    stack.push(OpType.Join);
-    LogicalNode left = visit(context, stack, join.getLeft());
-    LogicalNode right = visit(context, stack, join.getRight());
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(projection);
+    LogicalNode child = visit(context, stack, projection.getChild());
+    // check if it is aggregation query without group-by clause. If so, it inserts group-by node to its child.
+    child = insertGroupbyNodeIfUnresolved(context, child, stack);
     stack.pop();
+    ////////////////////////////////////////////////////////
 
-    // Phase 3: build this plan
-    JoinNode joinNode = new JoinNode(plan.newPID(), join.getJoinType(), left, right);
-
-    // Set A merged input schema
-    Schema merged;
-    if (join.isNatural()) {
-      merged = getNaturalJoin(left, right);
+    ProjectionNode projectionNode;
+    Target [] targets;
+    if (projection.isAllProjected()) {
+      // should takes all columns except for generated columns whose names are prefixed with '$'.
+      targets = PlannerUtil.schemaToTargetsWithGeneratedFields(child.getOutSchema());
     } else {
-      merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+      targets = buildTargets(plan, block, referenceNames);
     }
-    joinNode.setInSchema(merged);
-    joinNode.setOutSchema(merged);
 
-    // Determine join conditions
-    if (join.isNatural()) { // if natural join, it should have the equi-join conditions by common column names
-      Schema leftSchema = joinNode.getLeftChild().getInSchema();
-      Schema rightSchema = joinNode.getRightChild().getInSchema();
-      Schema commons = SchemaUtil.getCommons(leftSchema, rightSchema);
-      EvalNode njCond = getNaturalJoinCondition(leftSchema, rightSchema, commons);
-      joinNode.setJoinQual(njCond);
-    } else if (join.hasQual()) { // otherwise, the given join conditions are set
-      joinNode.setJoinQual(createEvalTree(plan, block, join.getQual()));
+    // Set ProjectionNode
+    projectionNode = context.queryBlock.getNodeFromExpr(projection);
+    projectionNode.setTargets(targets);
+    projectionNode.setOutSchema(PlannerUtil.targetToSchema(projectionNode.getTargets()));
+    projectionNode.setInSchema(child.getOutSchema());
+    projectionNode.setChild(child);
+
+    if (projection.isDistinct() && block.hasNode(NodeType.GROUP_BY)) {
+      throw new VerifyException("Cannot support grouping and distinct at the same time yet");
+    } else {
+      if (projection.isDistinct()) {
+        insertDistinctOperator(context, projectionNode, child);
+      }
     }
 
-    return joinNode;
+    // It's for debugging and unit tests purpose.
+    // It sets raw targets, all of them are raw expressions instead of references.
+    setRawTargets(context, targets, referenceNames, projection);
+
+    return projectionNode;
   }
 
-  private static EvalNode getNaturalJoinCondition(Schema outer, Schema inner, Schema commons) {
-    EvalNode njQual = null;
-    EvalNode equiQual;
+  private void setRawTargets(PlanContext context, Target[] targets, String[] referenceNames,
+                             Projection projection) throws PlanningException {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
 
-    Column leftJoinKey;
-    Column rightJoinKey;
-    for (Column common : commons.getColumns()) {
-      leftJoinKey = outer.getColumnByName(common.getColumnName());
-      rightJoinKey = inner.getColumnByName(common.getColumnName());
-      equiQual = new BinaryEval(EvalType.EQUAL,
-          new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
-      if (njQual == null) {
-        njQual = equiQual;
-      } else {
-        njQual = new BinaryEval(EvalType.AND,
-            njQual, equiQual);
+    if (projection.isAllProjected()) {
+      block.setUnresolvedTargets(targets);
+    } else {
+      // It's for debugging or unit tests.
+      Target [] unresolvedTargets = new Target[projection.getNamedExprs().length];
+      for (int i = 0; i < targets.length; i++) {
+        NamedExpr namedExpr = projection.getNamedExprs()[i];
+        EvalNode evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+        unresolvedTargets[i] = new Target(evalNode, referenceNames[i]);
       }
+      // it's for debugging or unit testing
+      block.setUnresolvedTargets(unresolvedTargets);
     }
-
-    return njQual;
   }
 
-  private static LogicalNode createCatasianProduct(LogicalPlan plan, LogicalNode left, LogicalNode right) {
-    JoinNode join = new JoinNode(plan.newPID(), JoinType.CROSS, left, right);
-    Schema joinSchema = SchemaUtil.merge(
-        join.getLeftChild().getOutSchema(),
-        join.getRightChild().getOutSchema());
-    join.setInSchema(joinSchema);
-    join.setOutSchema(joinSchema);
+  private void insertDistinctOperator(PlanContext context, ProjectionNode projectionNode, LogicalNode child) {
+    LogicalPlan plan = context.plan;
 
-    return join;
+    Schema outSchema = projectionNode.getOutSchema();
+    GroupbyNode dupRemoval = new GroupbyNode(plan.newPID());
+    dupRemoval.setGroupingColumns(outSchema.toArray());
+    dupRemoval.setTargets(PlannerUtil.schemaToTargets(outSchema));
+    dupRemoval.setInSchema(projectionNode.getInSchema());
+    dupRemoval.setOutSchema(outSchema);
+    dupRemoval.setChild(child);
+    projectionNode.setChild(dupRemoval);
+    projectionNode.setInSchema(dupRemoval.getOutSchema());
   }
 
-  private static Schema getNaturalJoin(LogicalNode outer, LogicalNode inner) {
-    Schema joinSchema = new Schema();
-    Schema commons = SchemaUtil.getCommons(outer.getOutSchema(),
-        inner.getOutSchema());
-    joinSchema.addColumns(commons);
-    for (Column c : outer.getOutSchema().getColumns()) {
-      for (Column common : commons.getColumns()) {
-        if (!common.getColumnName().equals(c.getColumnName())) {
-          joinSchema.addColumn(c);
-        }
+  private String [] doProjectionPrephase(PlanContext context, Projection projection) throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    int finalTargetNum = projection.size();
+    String [] referenceNames = new String[finalTargetNum];
+    ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[finalTargetNum];
+    NamedExpr namedExpr;
+    for (int i = 0; i < finalTargetNum; i++) {
+      namedExpr = projection.getNamedExprs()[i];
+
+      if (PlannerUtil.existsAggregationFunction(namedExpr)) {
+        block.setAggregationRequire();
       }
+      // dissect an expression into multiple parts (at most dissected into three parts)
+      normalizedExprList[i] = normalizer.normalize(context, namedExpr.getExpr());
     }
 
-    for (Column c : inner.getOutSchema().getColumns()) {
-      for (Column common : commons.getColumns()) {
-        if (!common.getColumnName().equals(c.getColumnName())) {
-          joinSchema.addColumn(c);
-        }
+    // Note: Why separate normalization and add(Named)Expr?
+    //
+    // ExprNormalizer internally makes use of the named exprs in NamedExprsManager.
+    // If we don't separate normalization work and addExprWithName, addExprWithName will find named exprs evaluated
+    // the same logical node. It will cause impossible evaluation in physical executors.
+    for (int i = 0; i < finalTargetNum; i++) {
+      namedExpr = projection.getNamedExprs()[i];
+      // Get all projecting references
+      if (namedExpr.hasAlias()) {
+        NamedExpr aliasedExpr = new NamedExpr(normalizedExprList[i].baseExpr, namedExpr.getAlias());
+        referenceNames[i] = block.namedExprsMgr.addNamedExpr(aliasedExpr);
+      } else {
+        referenceNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
       }
-    }
-    return joinSchema;
-  }
 
-  /*===============================================================================================
-    SET OPERATION SECTION
-   ===============================================================================================*/
+      // Add sub-expressions (i.e., aggregation part and scalar part) from dissected parts.
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
+    }
 
-  @Override
-  public LogicalNode visitUnion(PlanContext context, Stack<OpType> stack, SetOperation setOperation)
-      throws PlanningException {
-    return buildSetPlan(context, stack, setOperation);
+    return referenceNames;
   }
 
-  @Override
-  public LogicalNode visitExcept(PlanContext context, Stack<OpType> stack, SetOperation setOperation)
+  /**
+   * It builds non-from statement (only expressions) like '<code>SELECT 1+3 as plus</code>'.
+   */
+  private EvalExprNode buildPlanForNoneFromStatement(PlanContext context, Stack<Expr> stack, Projection projection)
       throws PlanningException {
-    return buildSetPlan(context, stack, setOperation);
-  }
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
 
-  @Override
-  public LogicalNode visitIntersect(PlanContext context, Stack<OpType> stack, SetOperation setOperation)
-      throws PlanningException {
-    return buildSetPlan(context, stack, setOperation);
+    int finalTargetNum = projection.getNamedExprs().length;
+    Target [] targets = new Target[finalTargetNum];
+
+    for (int i = 0; i < targets.length; i++) {
+      NamedExpr namedExpr = projection.getNamedExprs()[i];
+      EvalNode evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+      if (namedExpr.hasAlias()) {
+        targets[i] = new Target(evalNode, namedExpr.getAlias());
+      } else {
+        targets[i] = new Target(evalNode, context.plan.newGeneratedFieldName(namedExpr.getExpr()));
+      }
+    }
+    EvalExprNode evalExprNode = context.queryBlock.getNodeFromExpr(projection);
+    evalExprNode.setTargets(targets);
+    evalExprNode.setOutSchema(PlannerUtil.targetToSchema(targets));
+    // it's for debugging or unit testing
+    block.setUnresolvedTargets(targets);
+    return evalExprNode;
   }
 
-  private LogicalNode buildSetPlan(PlanContext context, Stack<OpType> stack, SetOperation setOperation)
+  private Target [] buildTargets(LogicalPlan plan, QueryBlock block, String[] referenceNames)
       throws PlanningException {
+    Target [] targets = new Target[referenceNames.length];
+    for (int i = 0; i < referenceNames.length; i++) {
+      if (block.namedExprsMgr.isResolved(referenceNames[i])) {
+        targets[i] = block.namedExprsMgr.getTarget(referenceNames[i]);
+      } else {
+        NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referenceNames[i]);
+        EvalNode evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+        block.namedExprsMgr.resolveExpr(referenceNames[i], evalNode);
+        targets[i] = new Target(evalNode, referenceNames[i]);
+      }
+    }
+    return targets;
+  }
 
-    // 1. Init Phase
+  /**
+   * Insert a group-by operator before a sort or a projection operator.
+   * It is used only when a group-by clause is not given.
+   */
+  private LogicalNode insertGroupbyNodeIfUnresolved(PlanContext context,
+                                                    LogicalNode child,
+                                                    Stack<Expr> stack) throws PlanningException {
     LogicalPlan plan = context.plan;
-    QueryBlock block = context.block;
+    QueryBlock block = context.queryBlock;
 
-    // 2. Build Child Plans
-    PlanContext leftContext = new PlanContext(plan, plan.newNoNameBlock());
-    Stack<OpType> leftStack = new Stack<OpType>();
-    LogicalNode left = visit(leftContext, leftStack, setOperation.getLeft());
-    TableSubQueryNode leftSubQuery = new TableSubQueryNode(plan.newPID(), leftContext.block.getName(), left);
-    context.plan.connectBlocks(leftContext.block, context.block, BlockType.TableSubQuery);
+    if (!block.isAggregationRequired()) {
+      GroupbyNode groupbyNode = new GroupbyNode(plan.newPID());
+      groupbyNode.setGroupingColumns(new Column[] {});
 
-    PlanContext rightContext = new PlanContext(plan, plan.newNoNameBlock());
-    Stack<OpType> rightStack = new Stack<OpType>();
-    LogicalNode right = visit(rightContext, rightStack, setOperation.getRight());
-    TableSubQueryNode rightSubQuery = new TableSubQueryNode(plan.newPID(), rightContext.block.getName(), right);
-    context.plan.connectBlocks(rightContext.block, context.block, BlockType.TableSubQuery);
+      Set<Target> evaluatedTargets = new LinkedHashSet<Target>();
+      boolean includeDistinctFunction = false;
+      for (Iterator<NamedExpr> it = block.namedExprsMgr.getUnresolvedExprs(); it.hasNext();) {
+        NamedExpr rawTarget = it.next();
+        try {
+          includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
+          EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+          if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+            block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
+            evaluatedTargets.add(new Target(evalNode, rawTarget.getAlias()));
+          }
+        } catch (VerifyException ve) {
+        }
+      }
+      groupbyNode.setDistinct(includeDistinctFunction);
+      groupbyNode.setTargets(evaluatedTargets.toArray(new Target[evaluatedTargets.size()]));
+      groupbyNode.setChild(child);
+      groupbyNode.setInSchema(child.getOutSchema());
 
-    BinaryNode setOp;
-    if (setOperation.getType() == OpType.Union) {
-      setOp = new UnionNode(plan.newPID(), leftSubQuery, rightSubQuery);
-    } else if (setOperation.getType() == OpType.Except) {
-      setOp = new ExceptNode(plan.newPID(), leftSubQuery, rightSubQuery);
-    } else if (setOperation.getType() == OpType.Intersect) {
-      setOp = new IntersectNode(plan.newPID(), leftSubQuery, rightSubQuery);
+      block.registerNode(groupbyNode); // this inserted group-by node doesn't pass through preprocessor. So manually added.
+      postHook(context, stack, null, groupbyNode);
+      return groupbyNode;
     } else {
-      throw new VerifyException("Invalid Type: " + setOperation.getType());
+      return child;
     }
+  }
 
-    // Strip the table names from the targets of the both blocks
-    // in order to check the equivalence the schemas of both blocks.
-    Target [] leftStrippedTargets = PlannerUtil.stripTarget(leftContext.block.getCurrentTargets());
+  /*===============================================================================================
+    SORT SECTION
+  ===============================================================================================*/
+  @Override
+  public LimitNode visitLimit(PlanContext context, Stack<Expr> stack, Limit limit) throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    EvalNode firstFetNum;
+    LogicalNode child;
+    if (limit.getFetchFirstNum().getType() == OpType.Literal) {
+      firstFetNum = exprAnnotator.createEvalNode(context.plan, block, limit.getFetchFirstNum());
+
+      ////////////////////////////////////////////////////////
+      // Visit and Build Child Plan
+      ////////////////////////////////////////////////////////
+      stack.push(limit);
+      child = visit(context, stack, limit.getChild());
+      stack.pop();
+      ////////////////////////////////////////////////////////
+    } else {
+      ExprNormalizedResult normalizedResult = normalizer.normalize(context, limit.getFetchFirstNum());
+      String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
+      block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
+
+      ////////////////////////////////////////////////////////
+      // Visit and Build Child Plan
+      ////////////////////////////////////////////////////////
+      stack.push(limit);
+      child = visit(context, stack, limit.getChild());
+      stack.pop();
+      ////////////////////////////////////////////////////////
 
-    Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
-    setOp.setInSchema(leftSubQuery.getOutSchema());
-    setOp.setOutSchema(outSchema);
 
-    if (isNoUpperProjection(stack)) {
-      block.targetListManager = new TargetListManager(plan, leftStrippedTargets);
-      block.targetListManager.resolveAll();
-      block.setSchema(block.targetListManager.getUpdatedSchema());
+      // Consider the following query:
+      //
+      // SELECT one + two AS total ... FROM ... LIMIT total + 10;
+      //
+      // "total + 10" cannot be resolved in ScanNode. "total + 10" should be resolved in some of the upper nodes.
+      // So, we have to check if the reference is resolved. If not, it annotates the expression here.
+      if (block.namedExprsMgr.isResolved(referName)) {
+        firstFetNum = block.namedExprsMgr.getTarget(referName).getEvalTree();
+      } else {
+        NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
+        firstFetNum = exprAnnotator.createEvalNode(context.plan, block, namedExpr.getExpr());
+        block.namedExprsMgr.resolveExpr(referName, firstFetNum);
+      }
     }
 
-    return setOp;
+    LimitNode limitNode = block.getNodeFromExpr(limit);
+    limitNode.setFetchFirst(firstFetNum.terminate(null).asInt8());
+    limitNode.setChild(child);
+    limitNode.setInSchema(child.getOutSchema());
+    limitNode.setOutSchema(child.getOutSchema());
+    return limitNode;
   }
 
   @Override
-  public SelectionNode visitFilter(PlanContext context, Stack<OpType> stack, Selection selection)
-      throws PlanningException {
-    // 1. init phase:
-    LogicalPlan plan = context.plan;
-    QueryBlock block = context.block;
+  public SortNode visitSort(PlanContext context, Stack<Expr> stack, Sort sort) throws PlanningException {
+    QueryBlock block = context.queryBlock;
 
-    // 1.1 finding pushable expressions from search condition
+    int sortKeyNum = sort.getSortSpecs().length;
+    Sort.SortSpec[] sortSpecs = sort.getSortSpecs();
+    String [] referNames = new String[sortKeyNum];
 
-    // 2. build child plans:
-    stack.push(OpType.Filter);
-    LogicalNode child = visit(context, stack, selection.getChild());
+    ExprNormalizedResult [] normalizedExprList = new ExprNormalizedResult[sortKeyNum];
+    for (int i = 0; i < sortKeyNum; i++) {
+      normalizedExprList[i] = normalizer.normalize(context, sortSpecs[i].getKey());
+    }
+    for (int i = 0; i < sortKeyNum; i++) {
+      referNames[i] = block.namedExprsMgr.addExpr(normalizedExprList[i].baseExpr);
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedExprList[i].scalarExprs);
+    }
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(sort);
+    LogicalNode child = visit(context, stack, sort.getChild());
+    child = insertGroupbyNodeIfUnresolved(context, child, stack);
     stack.pop();
+    ////////////////////////////////////////////////////////
 
-    // 3. build this plan:
-    EvalNode searchCondition = createEvalTree(plan, block, selection.getQual());
-    EvalNode simplified = AlgebraicUtil.eliminateConstantExprs(searchCondition);
-    SelectionNode selectionNode = new SelectionNode(plan.newPID(), simplified);
+    // Building sort keys
+    Column column;
+    SortSpec [] annotatedSortSpecs = new SortSpec[sortKeyNum];
+    for (int i = 0; i < sortKeyNum; i++) {
+      if (block.namedExprsMgr.isResolved(referNames[i])) {
+        column = block.namedExprsMgr.getTarget(referNames[i]).getNamedColumn();
+      } else {
+        throw new IllegalStateException("Unexpected State: " + TUtil.arrayToString(sortSpecs));
+      }
+      annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(), sortSpecs[i].isNullFirst());
+    }
 
-    // 4. set child plan, update input/output schemas:
-    selectionNode.setChild(child);
-    selectionNode.setInSchema(child.getOutSchema());
-    selectionNode.setOutSchema(child.getOutSchema());
+    SortNode sortNode = block.getNodeFromExpr(sort);
+    sortNode.setSortSpecs(annotatedSortSpecs);
 
-    // 5. update block information:
-    block.setSelectionNode(selectionNode);
+    // 4. Set Child Plan, Update Input/Output Schemas:
+    sortNode.setChild(child);
+    sortNode.setInSchema(child.getOutSchema());
+    sortNode.setOutSchema(child.getOutSchema());
 
-    return selectionNode;
+    return sortNode;
   }
 
   /*===============================================================================================
@@ -435,133 +499,130 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    ===============================================================================================*/
 
   @Override
-  public LogicalNode visitGroupBy(PlanContext context, Stack<OpType> stack, Aggregation aggregation)
+  public LogicalNode visitHaving(PlanContext context, Stack<Expr> stack, Having expr) throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    ExprNormalizedResult normalizedResult = normalizer.normalize(context, expr.getQual());
+    String referName = block.namedExprsMgr.addExpr(normalizedResult.baseExpr);
+    block.namedExprsMgr.addNamedExprArray(normalizedResult.aggExprs);
+    block.namedExprsMgr.addNamedExprArray(normalizedResult.scalarExprs);
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(expr);
+    LogicalNode child = visit(context, stack, expr.getChild());
+    stack.pop();
+    ////////////////////////////////////////////////////////
+
+    // Consider the following query:
+    //
+    // SELECT sum(avg) AS total ... FROM ... WHERE total > 2;
+    //
+    // "total" cannot be resolved in GroupByNode. "total" should be resolved in the upper nodes.
+    // So, we have to check if the reference is resolved. If not, it annotates the expression here.
+    EvalNode havingCondition;
+    if (block.namedExprsMgr.isResolved(referName)) {
+      havingCondition = block.namedExprsMgr.getTarget(referName).getEvalTree();
+    } else {
+      NamedExpr namedExpr = block.namedExprsMgr.getNamedExpr(referName);
+      havingCondition = exprAnnotator.createEvalNode(context.plan, block, namedExpr.getExpr());
+      block.namedExprsMgr.resolveExpr(referName, havingCondition);
+    }
+
+    HavingNode having = new HavingNode(context.plan.newPID());
+    having.setQual(havingCondition);
+    having.setInSchema(child.getOutSchema());
+    having.setOutSchema(child.getOutSchema());
+    having.setChild(child);
+    return having;
+  }
+
+  @Override
+  public LogicalNode visitGroupBy(PlanContext context, Stack<Expr> stack, Aggregation aggregation)
       throws PlanningException {
 
-    // 1. Initialization Phase:
+    // Initialization Phase:
     LogicalPlan plan = context.plan;
-    QueryBlock block = context.block;
+    QueryBlock block = context.queryBlock;
+
+    // Normalize grouping keys and add normalized grouping keys to NamedExprManager
+    int groupingKeyNum = aggregation.getGroupSet()[0].getGroupingSets().length;
+    ExprNormalizedResult [] normalizedResults = new ExprNormalizedResult[groupingKeyNum];
+    for (int i = 0; i < groupingKeyNum; i++) {
+      Expr groupingKey = aggregation.getGroupSet()[0].getGroupingSets()[i];
+      normalizedResults[i] = normalizer.normalize(context, groupingKey);
+    }
+    String [] groupingKeyRefNames = new String[groupingKeyNum];
+    for (int i = 0; i < groupingKeyNum; i++) {
+      groupingKeyRefNames[i] = block.namedExprsMgr.addExpr(normalizedResults[i].baseExpr);
+      block.namedExprsMgr.addNamedExprArray(normalizedResults[i].aggExprs);
+      block.namedExprsMgr.addNamedExprArray(normalizedResults[i].scalarExprs);
+    }
 
-    // 2. Build Child Plan Phase:
-    stack.push(OpType.Aggregation);
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(aggregation);
     LogicalNode child = visit(context, stack, aggregation.getChild());
     stack.pop();
+    ////////////////////////////////////////////////////////
+
+    Set<Target> evaluatedTargets = new LinkedHashSet<Target>();
+    boolean includeDistinctFunction = false;
+    for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
+      try {
+        includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+        if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+          block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
+          evaluatedTargets.add(new Target(evalNode, rawTarget.getAlias()));
+        }
+      } catch (VerifyException ve) {
+      }
+    }
 
     // 3. Build This Plan:
     Aggregation.GroupElement [] groupElements = aggregation.getGroupSet();
 
     if (groupElements[0].getType() == GroupType.OrdinaryGroup) { // for group-by
-      GroupElement annotatedElements [] = new GroupElement[groupElements.length];
-      for (int i = 0; i < groupElements.length; i++) {
-        annotatedElements[i] = new GroupElement(
-            groupElements[i].getType(),
-            annotateGroupingColumn(plan, block, groupElements[i].getColumns(), null));
+      Column [] groupingColumns = new Column[aggregation.getGroupSet()[0].getGroupingSets().length];
+      for (int i = 0; i < groupingColumns.length; i++) {
+        if (block.namedExprsMgr.isResolved(groupingKeyRefNames[i])) {
+          groupingColumns[i] = block.namedExprsMgr.getTarget(groupingKeyRefNames[i]).getNamedColumn();
+        } else {
+          throw new PlanningException("Each grouping column expression must be a scalar expression.");
+        }
       }
-      GroupbyNode groupingNode = new GroupbyNode(plan.newPID(), annotatedElements[0].getColumns());
-      if (aggregation.hasHavingCondition()) {
-        groupingNode.setHavingCondition(
-            createEvalTree(plan, block, aggregation.getHavingCondition()));
+
+      GroupbyNode groupingNode = context.queryBlock.getNodeFromExpr(aggregation);
+      groupingNode.setGroupingColumns(groupingColumns);
+      groupingNode.setDistinct(includeDistinctFunction);
+
+      List<Target> targets = new ArrayList<Target>();
+
+      for (Column column : groupingColumns) {
+        if (child.getOutSchema().contains(column)) {
+          targets.add(new Target(new FieldEval(child.getOutSchema().getColumn(column))));
+        }
       }
 
+      targets.addAll(evaluatedTargets);
+      groupingNode.setTargets(targets.toArray(new Target[targets.size()]));
       // 4. Set Child Plan and Update Input Schemes Phase
       groupingNode.setChild(child);
-      block.setGroupbyNode(groupingNode);
+      block.registerNode(groupingNode);
       groupingNode.setInSchema(child.getOutSchema());
 
       // 5. Update Output Schema and Targets for Upper Plan
 
       return groupingNode;
 
-    } else if (groupElements[0].getType() == GroupType.Cube) { // for cube by
-      List<Column[]> cuboids  = generateCuboids(annotateGroupingColumn(plan, block,
-          groupElements[0].getColumns(), null));
-      UnionNode topUnion = createGroupByUnion(plan, block, child, cuboids, 0);
-      block.resolveGroupingRequired();
-      block.getTargetListManager().resolveAll();
-
-      return topUnion;
     } else {
       throw new InvalidQueryException("Not support grouping");
     }
   }
 
-  private UnionNode createGroupByUnion(final LogicalPlan plan,
-                                       final QueryBlock block,
-                                       final LogicalNode subNode,
-                                       final List<Column[]> cuboids,
-                                       final int idx) {
-    UnionNode union;
-    try {
-      if ((cuboids.size() - idx) > 2) {
-        GroupbyNode g1 = new GroupbyNode(plan.newPID(), cuboids.get(idx));
-        Target[] clone = cloneTargets(block.getCurrentTargets());
-
-        g1.setTargets(clone);
-        g1.setChild((LogicalNode) subNode.clone());
-        g1.setInSchema(g1.getChild().getOutSchema());
-        Schema outSchema = getProjectedSchema(plan, block.getCurrentTargets());
-        g1.setOutSchema(outSchema);
-
-        LogicalNode right = createGroupByUnion(plan, block, subNode, cuboids, idx+1);
-        union = new UnionNode(plan.newPID(), g1, right);
-        union.setInSchema(g1.getOutSchema());
-        union.setOutSchema(g1.getOutSchema());
-
-        return union;
-      } else {
-        GroupbyNode g1 = new GroupbyNode(plan.newPID(), cuboids.get(idx));
-        Target[] clone = cloneTargets(block.getCurrentTargets());
-        g1.setTargets(clone);
-        g1.setChild((LogicalNode) subNode.clone());
-        g1.setInSchema(g1.getChild().getOutSchema());
-        Schema outSchema = getProjectedSchema(plan, clone);
-        g1.setOutSchema(outSchema);
-
-        GroupbyNode g2 = new GroupbyNode(plan.newPID(), cuboids.get(idx+1));
-        clone = cloneTargets(block.getCurrentTargets());
-        g2.setTargets(clone);
-        g2.setChild((LogicalNode) subNode.clone());
-        g2.setInSchema(g1.getChild().getOutSchema());
-        outSchema = getProjectedSchema(plan, clone);
-        g2.setOutSchema(outSchema);
-        union = new UnionNode(plan.newPID(), g1, g2);
-        union.setInSchema(g1.getOutSchema());
-        union.setOutSchema(g1.getOutSchema());
-
-        return union;
-      }
-    } catch (CloneNotSupportedException cnse) {
-      LOG.error(cnse);
-      throw new InvalidQueryException(cnse);
-    }
-  }
-
-  /**
-   * It transforms a list of column references into a list of annotated columns with considering aliased expressions.
-   */
-  private Column[] annotateGroupingColumn(LogicalPlan plan, QueryBlock block,
-                                           ColumnReferenceExpr[] columnRefs, LogicalNode child)
-      throws PlanningException {
-
-    Column[] columns = new Column[columnRefs.length];
-    for (int i = 0; i < columnRefs.length; i++) {
-      columns[i] = plan.resolveColumn(block, null, columnRefs[i]);
-      columns[i] = plan.getColumnOrAliasedColumn(block, columns[i]);
-    }
-
-    return columns;
-  }
-
-  private static Target[] cloneTargets(Target[] sourceTargets)
-      throws CloneNotSupportedException {
-    Target[] clone = new Target[sourceTargets.length];
-    for (int i = 0; i < sourceTargets.length; i++) {
-      clone[i] = (Target) sourceTargets[i].clone();
-    }
-
-    return clone;
-  }
-
   public static final Column[] ALL= Lists.newArrayList().toArray(new Column[0]);
 
   public static List<Column[]> generateCuboids(Column[] columns) {
@@ -585,160 +646,460 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return cube;
   }
 
+  @Override
+  public SelectionNode visitFilter(PlanContext context, Stack<Expr> stack, Selection selection)
+      throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    ExprNormalizedResult normalizedResult = normalizer.normalize(context, selection.getQual());
+    block.namedExprsMgr.addReferences(normalizedResult.baseExpr);
+    if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
+      throw new VerifyException("Filter condition cannot include aggregation function");
+    }
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(selection);
+    LogicalNode child = visit(context, stack, selection.getChild());
+    stack.pop();
+    ////////////////////////////////////////////////////////
+
+    // Create EvalNode for a search condition.
+    EvalNode searchCondition = exprAnnotator.createEvalNode(context.plan, block, selection.getQual());
+    EvalNode simplified = AlgebraicUtil.eliminateConstantExprs(searchCondition);
+
+    SelectionNode selectionNode = context.queryBlock.getNodeFromExpr(selection);
+    selectionNode.setQual(simplified);
+    selectionNode.setChild(child);
+    selectionNode.setInSchema(child.getOutSchema());
+    selectionNode.setOutSchema(child.getOutSchema());
+
+    // 5. update block information:
+    block.registerNode(selectionNode);
+
+    return selectionNode;
+  }
+
   /*===============================================================================================
-    SORT SECTION
+    JOIN SECTION
    ===============================================================================================*/
 
   @Override
-  public SortNode visitSort(PlanContext context, Stack<OpType> stack, Sort sort) throws PlanningException {
-
-    // 1. Initialization Phase:
+  public LogicalNode visitJoin(PlanContext context, Stack<Expr> stack, Join join)
+      throws PlanningException {
+    // Phase 1: Init
     LogicalPlan plan = context.plan;
-    QueryBlock block = context.block;
+    QueryBlock block = context.queryBlock;
 
-    // 2. Build Child Plans:
-    stack.push(OpType.Sort);
-    LogicalNode child = visit(context, stack, sort.getChild());
-    child = insertGroupbyNodeIfUnresolved(plan, block, child, stack);
+    if (join.hasQual()) {
+      ExprNormalizedResult normalizedResult = normalizer.normalize(context, join.getQual());
+      block.namedExprsMgr.addReferences(normalizedResult.baseExpr);
+      if (normalizedResult.aggExprs.size() > 0 || normalizedResult.scalarExprs.size() > 0) {
+        throw new VerifyException("Filter condition cannot include aggregation function");
+      }
+    }
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
+    stack.push(join);
+    LogicalNode left = visit(context, stack, join.getLeft());
+    LogicalNode right = visit(context, stack, join.getRight());
     stack.pop();
+    ////////////////////////////////////////////////////////
 
-    // 3. Build this plan:
-    SortSpec[] annotatedSortSpecs = new SortSpec[sort.getSortSpecs().length];
-    Column column;
-    Sort.SortSpec[] sortSpecs = sort.getSortSpecs();
-    for (int i = 0; i < sort.getSortSpecs().length; i++) {
-      column = plan.resolveColumn(block, null, sortSpecs[i].getKey());
-      column = plan.getColumnOrAliasedColumn(block, column);
-      annotatedSortSpecs[i] = new SortSpec(column, sortSpecs[i].isAscending(), sortSpecs[i].isNullFirst());
+
+    JoinNode joinNode = context.queryBlock.getNodeFromExpr(join);
+    joinNode.setJoinType(join.getJoinType());
+    joinNode.setLeftChild(left);
+    joinNode.setRightChild(right);
+
+    // Set A merged input schema
+    Schema merged;
+    if (join.isNatural()) {
+      merged = getNaturalJoinSchema(left, right);
+    } else {
+      merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
     }
-    SortNode sortNode = new SortNode(context.plan.newPID(), annotatedSortSpecs);
+    joinNode.setInSchema(merged);
 
-    // 4. Set Child Plan, Update Input/Output Schemas:
-    sortNode.setChild(child);
-    sortNode.setInSchema(child.getOutSchema());
-    sortNode.setOutSchema(child.getOutSchema());
+    // Create EvalNode for a search condition.
+    EvalNode joinCondition = null;
+    if (join.hasQual()) {
+      EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, block, join.getQual());
+      joinCondition = AlgebraicUtil.eliminateConstantExprs(evalNode);
+    }
 
-    return sortNode;
+    EvalNode evalNode;
+    List<Expr> newlyEvaluatedExprs = TUtil.newList();
+    for (Iterator<NamedExpr> it = block.namedExprsMgr.getUnresolvedExprs(); it.hasNext();) {
+      NamedExpr namedExpr = it.next();
+      try {
+        evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, evalNode, joinNode, stack.peek().getType() != OpType.Join)) {
+          block.namedExprsMgr.resolveExpr(namedExpr.getAlias(), evalNode);
+          newlyEvaluatedExprs.add(namedExpr.getExpr());
+        }
+      } catch (VerifyException ve) {}
+    }
+
+    List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(merged));
+
+    for (Expr newAddedExpr : newlyEvaluatedExprs) {
+      targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
+    }
+    joinNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+    // Determine join conditions
+    if (join.isNatural()) { // if natural join, it should have the equi-join conditions by common column names
+      Schema leftSchema = joinNode.getLeftChild().getInSchema();
+      Schema rightSchema = joinNode.getRightChild().getInSchema();
+      Schema commons = SchemaUtil.getNaturalJoinColumns(leftSchema, rightSchema);
+      EvalNode njCond = getNaturalJoinCondition(leftSchema, rightSchema, commons);
+      joinNode.setJoinQual(njCond);
+    } else if (join.hasQual()) { // otherwise, the given join conditions are set
+      joinNode.setJoinQual(joinCondition);
+    }
+
+    return joinNode;
+  }
+
+  private static Schema getNaturalJoinSchema(LogicalNode left, LogicalNode right) {
+    Schema joinSchema = new Schema();
+    Schema commons = SchemaUtil.getNaturalJoinColumns(left.getOutSchema(), right.getOutSchema());
+    joinSchema.addColumns(commons);
+    for (Column c : left.getOutSchema().getColumns()) {
+      if (!joinSchema.contains(c.getQualifiedName())) {
+        joinSchema.addColumn(c);
+      }
+    }
+
+    for (Column c : right.getOutSchema().getColumns()) {
+      if (!joinSchema.contains(c.getQualifiedName())) {
+        joinSchema.addColumn(c);
+      }
+    }
+    return joinSchema;
+  }
+
+  private static EvalNode getNaturalJoinCondition(Schema outer, Schema inner, Schema commons) {
+    EvalNode njQual = null;
+    EvalNode equiQual;
+
+    Column leftJoinKey;
+    Column rightJoinKey;
+    for (Column common : commons.getColumns()) {
+      leftJoinKey = outer.getColumnByName(common.getColumnName());
+      rightJoinKey = inner.getColumnByName(common.getColumnName());
+      equiQual = new BinaryEval(EvalType.EQUAL,
+          new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
+      if (njQual == null) {
+        njQual = equiQual;
+      } else {
+        njQual = new BinaryEval(EvalType.AND, njQual, equiQual);
+      }
+    }
+
+    return njQual;
+  }
+
+  private LogicalNode createCartesianProduct(PlanContext context, LogicalNode left, LogicalNode right)
+      throws PlanningException {
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    EvalNode evalNode;
+    List<Expr> newlyEvaluatedExprs = TUtil.newList();
+    for (Iterator<NamedExpr> it = block.namedExprsMgr.getUnresolvedExprs(); it.hasNext();) {
+      NamedExpr namedExpr = it.next();
+      try {
+        evalNode = exprAnnotator.createEvalNode(plan, block, namedExpr.getExpr());
+        if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() == 0) {
+          block.namedExprsMgr.resolveExpr(namedExpr.getAlias(), evalNode);
+          newlyEvaluatedExprs.add(namedExpr.getExpr());
+        }
+      } catch (VerifyException ve) {}
+    }
+
+    Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+    List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(merged));
+    for (Expr newAddedExpr : newlyEvaluatedExprs) {
+      targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
+    }
+
+    JoinNode join = new JoinNode(plan.newPID(), JoinType.CROSS, left, right);
+    join.setTargets(targets.toArray(new Target[targets.size()]));
+    join.setInSchema(merged);
+    return join;
+  }
+
+  @Override
+  public LogicalNode visitRelationList(PlanContext context, Stack<Expr> stack, RelationList relations)
+      throws PlanningException {
+
+    LogicalNode current = visit(context, stack, relations.getRelations()[0]);
+
+    LogicalNode left;
+    LogicalNode right;
+    if (relations.size() > 1) {
+
+      for (int i = 1; i < relations.size(); i++) {
+        left = current;
+        right = visit(context, stack, relations.getRelations()[i]);
+        current = createCartesianProduct(context, left, right);
+      }
+    }
+    context.queryBlock.registerNode(current);
+
+    return current;
+  }
+
+  @Override
+  public ScanNode visitRelation(PlanContext context, Stack<Expr> stack, Relation expr)
+      throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    ScanNode scanNode = block.getNodeFromExpr(expr);
+    updatePhysicalInfo(scanNode.getTableDesc());
+
+    // Add additional expressions required in upper nodes.
+    Set<Expr> newlyEvaluatedExprs = new LinkedHashSet<Expr>();
+    for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
+      try {
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+        if (checkIfBeEvaluatedAtRelation(block, evalNode, scanNode)) {
+          block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
+          newlyEvaluatedExprs.add(rawTarget.getExpr()); // newly added exr
+        }
+      } catch (VerifyException ve) {
+      }
+    }
+
+    // Assume that each unique expr is evaluated once.
+    List<Target> targets = new ArrayList<Target>();
+    for (Column column : scanNode.getInSchema().getColumns()) {
+      ColumnReferenceExpr columnRef = new ColumnReferenceExpr(column.getQualifier(), column.getColumnName());
+      if (block.namedExprsMgr.contains(columnRef)) {
+        String referenceName = block.namedExprsMgr.getName(columnRef);
+        targets.add(new Target(new FieldEval(column), referenceName));
+        newlyEvaluatedExprs.remove(columnRef);
+      } else {
+        targets.add(new Target(new FieldEval(column)));
+      }
+    }
+
+    for (Expr newAddedExpr : newlyEvaluatedExprs) {
+      targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
+    }
+
+    scanNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+    return scanNode;
+  }
+
+  private void updatePhysicalInfo(TableDesc desc) {
+    if (desc.getPath() != null) {
+      try {
+        FileSystem fs = desc.getPath().getFileSystem(new Configuration());
+        FileStatus status = fs.getFileStatus(desc.getPath());
+        if (desc.getStats() != null && (status.isDirectory() || status.isFile())) {
+          ContentSummary summary = fs.getContentSummary(desc.getPath());
+          if (summary != null) {
+            long volume = summary.getLength();
+            desc.getStats().setNumBytes(volume);
+          }
+        }
+      } catch (Throwable t) {
+        LOG.warn(t);
+      }
+    }
   }
 
-  @Override
-  public LimitNode visitLimit(PlanContext context, Stack<OpType> stack, Limit limit) throws PlanningException {
-    // 1. Init Phase:
-    LogicalPlan plan = context.plan;
-    QueryBlock block = context.block;
+  public TableSubQueryNode visitTableSubQuery(PlanContext context, Stack<Expr> stack, TablePrimarySubQuery expr)
+      throws PlanningException {
+    QueryBlock block = context.queryBlock;
+
+    QueryBlock childBlock = context.plan.getBlock(context.plan.getBlockNameByExpr(expr.getSubQuery()));
+    PlanContext newContext = new PlanContext(context, childBlock);
+    LogicalNode child = visit(newContext, new Stack<Expr>(), expr.getSubQuery());
+    TableSubQueryNode subQueryNode = context.queryBlock.getNodeFromExpr(expr);
+    context.plan.connectBlocks(childBlock, context.queryBlock, BlockType.TableSubQuery);
+    subQueryNode.setSubQuery(child);
+
+    // Add additional expressions required in upper nodes.
+    Set<Expr> newlyEvaluatedExprs = new LinkedHashSet<Expr>();
+    for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
+      try {
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
+        if (checkIfBeEvaluatedAtRelation(block, evalNode, subQueryNode)) {
+          block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
+          newlyEvaluatedExprs.add(rawTarget.getExpr()); // newly added exr
+        }
+      } catch (VerifyException ve) {
+      }
+    }
 
-    // build child plans
-    stack.push(OpType.Limit);
-    LogicalNode child = visit(context, stack, limit.getChild());
-    stack.pop();
+    // Assume that each unique expr is evaluated once.
+    List<Target> targets = new ArrayList<Target>();
+    for (Column column : subQueryNode.getInSchema().getColumns()) {
+      ColumnReferenceExpr columnRef = new ColumnReferenceExpr(column.getQualifier(), column.getColumnName());
+      if (block.namedExprsMgr.contains(columnRef)) {
+        String referenceName = block.namedExprsMgr.getName(columnRef);
+        targets.add(new Target(new FieldEval(column), referenceName));
+        newlyEvaluatedExprs.remove(columnRef);
+      } else {
+        targets.add(new Target(new FieldEval(column)));
+      }
+    }
 
-    // build limit plan
-    EvalNode firstFetchNum = createEvalTree(plan, block, limit.getFetchFirstNum());
-    firstFetchNum.eval(null, null, null);
-    LimitNode limitNode = new LimitNode(context.plan.newPID(), firstFetchNum.terminate(null).asInt8());
+    for (Expr newAddedExpr : newlyEvaluatedExprs) {
+      targets.add(block.namedExprsMgr.getTarget(newAddedExpr, true));
+    }
 
-    // set child plan and update input/output schemas.
-    limitNode.setChild(child);
-    limitNode.setInSchema(child.getOutSchema());
-    limitNode.setOutSchema(child.getOutSchema());
-    return limitNode;
+    subQueryNode.setTargets(targets.toArray(new Target[targets.size()]));
+
+    return subQueryNode;
   }
 
-  /*===============================================================================================
-    PROJECTION SECTION
+    /*===============================================================================================
+    SET OPERATION SECTION
    ===============================================================================================*/
 
   @Override
-  public LogicalNode visitProjection(PlanContext context, Stack<OpType> stack, Projection projection)
+  public LogicalNode visitUnion(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
       throws PlanningException {
+    return buildSetPlan(context, stack, setOperation);
+  }
 
-    //1: init Phase
-    LogicalPlan plan = context.plan;
-    QueryBlock block = context.block;
+  @Override
+  public LogicalNode visitExcept(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
+      throws PlanningException {
+    return buildSetPlan(context, stack, setOperation);
+  }
 
-    if (!projection.isAllProjected()) {
-      block.targetListManager = new TargetListManager(plan, projection);
-    }
+  @Override
+  public LogicalNode visitIntersect(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
+      throws PlanningException {
+    return buildSetPlan(context, stack, setOperation);
+  }
 
-    if (!projection.hasChild()) {
-      EvalExprNode evalOnly =
-          new EvalExprNode(context.plan.newPID(), annotateTargets(plan, block, projection.getTargets()));
-      evalOnly.setOutSchema(getProjectedSchema(plan, evalOnly.getExprs()));
-      block.setProjectionNode(evalOnly);
-      for (int i = 0; i < evalOnly.getTargets().length; i++) {
-        block.targetListManager.fill(i, evalOnly.getTargets()[i]);
-      }
-      return evalOnly;
-    }
+  private LogicalNode buildSetPlan(PlanContext context, Stack<Expr> stack, SetOperation setOperation)
+      throws PlanningException {
 
-    // 2: Build Child Plans
-    stack.push(OpType.Projection);
-    LogicalNode child = visit(context, stack, projection.getChild());
-    child = insertGroupbyNodeIfUnresolved(plan, block, child, stack);
+    // 1. Init Phase
+    LogicalPlan plan = context.plan;
+    QueryBlock block = context.queryBlock;
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Left Child Plan
+    ////////////////////////////////////////////////////////
+    QueryBlock leftBlock = context.plan.getBlockByExpr(setOperation.getLeft());
+    PlanContext leftContext = new PlanContext(context, leftBlock);
+    stack.push(setOperation);
+    LogicalNode leftChild = visit(leftContext, new Stack<Expr>(), setOperation.getLeft());
     stack.pop();
+    // Connect left child and current blocks
+    context.plan.connectBlocks(leftContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Right Child Plan
+    ////////////////////////////////////////////////////////
+    QueryBlock rightBlock = context.plan.getBlockByExpr(setOperation.getRight());
+    PlanContext rightContext = new PlanContext(context, rightBlock);
+    stack.push(setOperation);
+    LogicalNode rightChild = visit(rightContext, new Stack<Expr>(), setOperation.getRight());
+    stack.pop();
+    // Connect right child and current blocks
+    context.plan.connectBlocks(rightContext.queryBlock, context.queryBlock, BlockType.TableSubQuery);
 
-    // All targets must be evaluable before the projection.
-    Preconditions.checkState(block.getTargetListManager().isAllResolved(),
-        "Some targets cannot be evaluated in the query block \"%s\"", block.getName());
-
-    ProjectionNode projectionNode;
-    if (projection.isAllProjected()) {
-      projectionNode = new ProjectionNode(context.plan.newPID(), PlannerUtil.schemaToTargets(child.getOutSchema()));
+    BinaryNode setOp;
+    if (setOperation.getType() == OpType.Union) {
+      setOp = new UnionNode(plan.newPID());
+    } else if (setOperation.getType() == OpType.Except) {
+      setOp = new ExceptNode(plan.newPID(), leftChild, rightChild);
+    } else if (setOperation.getType() == OpType.Intersect) {
+      setOp = new IntersectNode(plan.newPID(), leftChild, rightChild);
     } else {
-      projectionNode = new ProjectionNode(context.plan.newPID(), block.getCurrentTargets());
+      throw new VerifyException("Invalid Type: " + setOperation.getType());
     }
+    setOp.setLeftChild(leftChild);
+    setOp.setRightChild(rightChild);
 
-    block.setProjectionNode(projectionNode);
-    projectionNode.setOutSchema(getProjectedSchema(plan, projectionNode.getTargets()));
-    projectionNode.setInSchema(child.getOutSchema());
-    projectionNode.setChild(child);
+    // An union statement can be derived from two query blocks.
+    // For one union statement between both relations, we can ensure that each corresponding data domain of both
+    // relations are the same. However, if necessary, the schema of left query block will be used as a base schema.
+    Target [] leftStrippedTargets = PlannerUtil.stripTarget(
+        PlannerUtil.schemaToTargets(leftBlock.getRoot().getOutSchema()));
 
-    if (projection.isDistinct() && block.hasGrouping()) {
-      throw new VerifyException("Cannot support grouping and distinct at the same time");
-    } else {
-      if (projection.isDistinct()) {
-        Schema outSchema = projectionNode.getOutSchema();
-        GroupbyNode dupRemoval = new GroupbyNode(plan.newPID(), outSchema.toArray());
-        dupRemoval.setTargets(block.getTargetListManager().getTargets());
-        dupRemoval.setInSchema(child.getOutSchema());
-        dupRemoval.setOutSchema(outSchema);
-        dupRemoval.setChild(child);
-        projectionNode.setChild(dupRemoval);
-      }
-    }
+    Schema outSchema = PlannerUtil.targetToSchema(leftStrippedTargets);
+    setOp.setInSchema(leftChild.getOutSchema());
+    setOp.setOutSchema(outSchema);
 
-    return projectionNode;
+    return setOp;
   }
 
-  /**
-   * Insert a group-by operator before a sort or a projection operator.
-   * It is used only when a group-by clause is not given.
-   */
-  private LogicalNode insertGroupbyNodeIfUnresolved(LogicalPlan plan, QueryBlock block,
-                                                    LogicalNode child, Stack<OpType> stack) throws PlanningException {
+  /*===============================================================================================
+    INSERT SECTION
+   ===============================================================================================*/
 
-    if (!block.isGroupingResolved()) {
-      GroupbyNode groupbyNode = new GroupbyNode(plan.newPID(), new Column[] {});
-      groupbyNode.setTargets(block.getCurrentTargets());
-      groupbyNode.setChild(child);
-      groupbyNode.setInSchema(child.getOutSchema());
+  public LogicalNode visitInsert(PlanContext context, Stack<Expr> stack, Insert expr) throws PlanningException {
+    stack.push(expr);
+    QueryBlock newQueryBlock = context.plan.getBlockByExpr(expr.getSubQuery());
+    PlanContext newContext = new PlanContext(context, newQueryBlock);
+    Stack<Expr> subStack = new Stack<Expr>();
+    LogicalNode subQuery = visit(newContext, subStack, expr.getSubQuery());
+    context.plan.connectBlocks(newQueryBlock, context.queryBlock, BlockType.TableSubQuery);
+    stack.pop();
 
-      block.postVisit(groupbyNode, stack);
-      block.checkAndResolveTargets(groupbyNode);
-      return groupbyNode;
-    } else {
-      return child;
+    InsertNode insertNode = null;
+    if (expr.hasTableName()) {
+      TableDesc desc = catalog.getTableDesc(expr.getTableName());
+      context.queryBlock.addRelation(new ScanNode(context.plan.newPID(), desc));
+
+      Schema targetSchema = new Schema();
+      if (expr.hasTargetColumns()) {
+        // INSERT OVERWRITE INTO TABLE tbl(col1 type, col2 type) SELECT ...
+        String [] targetColumnNames = expr.getTargetColumns();
+        for (int i = 0; i < targetColumnNames.length; i++) {
+          Column targetColumn = context.plan.resolveColumn(context.queryBlock,
+              new ColumnReferenceExpr(targetColumnNames[i]));
+          targetSchema.addColumn(targetColumn);
+        }
+      } else {
+        // use the output schema of select clause as target schema
+        // if didn't specific target columns like the way below,
+        // INSERT OVERWRITE INTO TABLE tbl SELECT ...
+        Schema targetTableSchema = desc.getSchema();
+        for (int i = 0; i < subQuery.getOutSchema().getColumnNum(); i++) {
+          targetSchema.addColumn(targetTableSchema.getColumn(i));
+        }
+      }
+
+      insertNode = context.queryBlock.getNodeFromExpr(expr);
+      insertNode.setTargetTableDesc(desc);
+      insertNode.setSubQuery(subQuery);
+      insertNode.setTargetSchema(targetSchema);
+      insertNode.setOutSchema(targetSchema);
     }
-  }
 
-  private boolean isNoUpperProjection(Stack<OpType> stack) {
-    for (OpType node : stack) {
-      if (!( (node == OpType.Projection) || (node == OpType.Aggregation) || (node == OpType.Join) )) {
-        return false;
+    if (expr.hasLocation()) {
+      insertNode = context.queryBlock.getNodeFromExpr(expr);
+      insertNode.setTargetLocation(new Path(expr.getLocation()));
+      insertNode.setSubQuery(subQuery);
+      if (expr.hasStorageType()) {
+        insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+      }
+      if (expr.hasParams()) {
+        Options options = new Options();
+        options.putAll(expr.getParams());
+        insertNode.setOptions(options);
       }
     }
 
-    return true;
+    insertNode.setOverwrite(expr.isOverwrite());
+
+    return insertNode;
   }
 
   /*===============================================================================================
@@ -746,13 +1107,13 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    ===============================================================================================*/
 
   @Override
-  public LogicalNode visitCreateTable(PlanContext context, Stack<OpType> stack, CreateTable expr)
+  public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
       throws PlanningException {
 
     String tableName = expr.getTableName();
 
     if (expr.hasSubQuery()) {
-      stack.add(OpType.CreateTable);
+      stack.add(expr);
       LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
       stack.pop();
       StoreTableNode storeNode = new StoreTableNode(context.plan.newPID(), tableName);
@@ -794,23 +1155,22 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             mergedPartition = true;
           }
         } else {
-          throw new PlanningException(String.format("Not supported PartitonType: %s", 
-                                      expr.getPartition().getPartitionType()));
+          throw new PlanningException(String.format("Not supported PartitonType: %s",
+              expr.getPartition().getPartitionType()));
         }
       }
 
       if (mergedPartition) {
         ColumnDefinition [] merged = TUtil.concat(expr.getTableElements(),
-                          ((ColumnPartition)expr.getPartition()).getColumns());
+            ((ColumnPartition)expr.getPartition()).getColumns());
         tableSchema = convertTableElementsSchema(merged);
       } else {
         tableSchema = convertTableElementsSchema(expr.getTableElements());
       }
 
-      CreateTableNode createTableNode = new CreateTableNode(
-          context.plan.newPID(),
-          expr.getTableName(),
-          tableSchema);
+      CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
+      createTableNode.setTableName(expr.getTableName());
+      createTableNode.setSchema(tableSchema);
 
       if (expr.isExternal()) {
         createTableNode.setExternal(true);
@@ -834,12 +1194,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         createTableNode.setPath(new Path(expr.getLocation()));
       }
 
-      if (expr.hasPartition()) { 
+      if (expr.hasPartition()) {
         if (expr.getPartition().getPartitionType().equals(PartitionType.COLUMN)) {
           createTableNode.setPartitions(convertTableElementsPartition(context, expr));
         } else {
-          throw new PlanningException(String.format("Not supported PartitonType: %s", 
-                                      expr.getPartition().getPartitionType()));
+          throw new PlanningException(String.format("Not supported PartitonType: %s",
+              expr.getPartition().getPartitionType()));
         }
       }
 
@@ -856,7 +1216,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * @throws PlanningException
    */
   private PartitionDesc convertTableElementsPartition(PlanContext context,
-                                                   CreateTable expr) throws PlanningException {
+                                                      CreateTable expr) throws PlanningException {
     Schema schema = convertTableElementsSchema(expr.getTableElements());
     PartitionDesc partitionDesc = null;
     List<Specifier> specifiers = null;
@@ -909,8 +1269,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             Specifier specifier = new Specifier(eachSpec.getName());
             sb.delete(0, sb.length());
             for(Expr eachExpr : eachSpec.getValueList().getValues()) {
-              context.block.setSchema(schema);
-              EvalNode eval = createEvalTree(context.plan, context.block, eachExpr);
+              context.queryBlock.setSchema(schema);
+              EvalNode eval = exprAnnotator.createEvalNode(context.plan, context.queryBlock, eachExpr);
               if(sb.length() > 1)
                 sb.append(",");
 
@@ -936,8 +1296,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
               specifier.setName(eachSpec.getName());
 
             if (eachSpec.getEnd() != null) {
-              context.block.setSchema(schema);
-              EvalNode eval = createEvalTree(context.plan, context.block, eachSpec.getEnd());
+              context.queryBlock.setSchema(schema);
+              EvalNode eval = exprAnnotator.createEvalNode(context.plan, context.queryBlock, eachSpec.getEnd());
               specifier.setExpressions(eval.toString());
             }
 
@@ -988,7 +1348,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   private Collection<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
-                                                   ColumnReferenceExpr[] references) {
+                                                         ColumnReferenceExpr[] references) {
     List<Column> columnList = TUtil.newList();
 
     for(CreateTable.ColumnDefinition columnDefinition: elements) {
@@ -1002,10 +1362,14 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return columnList;
   }
 
-  private DataType convertDataType(DataTypeExpr dataType) {
+  private Column convertColumn(ColumnDefinition columnDefinition) {
+    return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
+  }
+
+  static TajoDataTypes.DataType convertDataType(DataTypeExpr dataType) {
     TajoDataTypes.Type type = TajoDataTypes.Type.valueOf(dataType.getTypeName());
 
-    DataType.Builder builder = DataType.newBuilder();
+    TajoDataTypes.DataType.Builder builder = TajoDataTypes.DataType.newBuilder();
     builder.setType(type);
     if (dataType.hasLengthOrPrecision()) {
       builder.setLength(dataType.getLengthOrPrecision());
@@ -1013,486 +1377,99 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return builder.build();
   }
 
-  private Column convertColumn(ColumnDefinition columnDefinition) {
-    return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
-  }
-
-  public LogicalNode visitInsert(PlanContext context, Stack<OpType> stack, Insert expr) throws PlanningException {
-    stack.push(expr.getType());
-    QueryBlock newQueryBlock = context.plan.newNoNameBlock();
-    PlanContext newContext = new PlanContext(context.plan, newQueryBlock);
-    Stack<OpType> subStack = new Stack<OpType>();
-    LogicalNode subQuery = visit(newContext, subStack, expr.getSubQuery());
-    context.plan.connectBlocks(newQueryBlock, context.block, BlockType.TableSubQuery);
-    stack.pop();
-
-    InsertNode insertNode = null;
-    if (expr.hasTableName()) {
-      TableDesc desc = catalog.getTableDesc(expr.getTableName());
-      context.block.addRelation(new ScanNode(context.plan.newPID(), desc));
-
-      Schema targetSchema = new Schema();
-      if (expr.hasTargetColumns()) {
-        // INSERT OVERWRITE INTO TABLE tbl(col1 type, col2 type) SELECT ...
-        String [] targetColumnNames = expr.getTargetColumns();
-        for (int i = 0; i < targetColumnNames.length; i++) {
-          Column targetColumn = context.plan.resolveColumn(context.block, null, new ColumnReferenceExpr(targetColumnNames[i]));
-          targetSchema.addColumn(targetColumn);
-        }
-      } else {
-        // use the output schema of select clause as target schema
-        // if didn't specific target columns like the way below,
-        // INSERT OVERWRITE INTO TABLE tbl SELECT ...
-        Schema targetTableSchema = desc.getSchema();
-        for (int i = 0; i < subQuery.getOutSchema().getColumnNum(); i++) {
-          targetSchema.addColumn(targetTableSchema.getColumn(i));
-        }
-      }
-
-      insertNode = new InsertNode(context.plan.newPID(), desc, subQuery);
-      insertNode.setTargetSchema(targetSchema);
-      insertNode.setOutSchema(targetSchema);
-    }
-
-    if (expr.hasLocation()) {
-      insertNode = new InsertNode(context.plan.newPID(), new Path(expr.getLocation()), subQuery);
-      if (expr.hasStorageType()) {
-        insertNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
-      }
-      if (expr.hasParams()) {
-        Options options = new Options();
-        options.putAll(expr.getParams());
-        insertNode.setOptions(options);
-      }
-    }
-
-    insertNode.setOverwrite(expr.isOverwrite());
-
-    return insertNode;
-  }
 
   @Override
-  public LogicalNode visitDropTable(PlanContext context, Stack<OpType> stack, DropTable dropTable) {
-    DropTableNode dropTableNode = new DropTableNode(context.plan.newPID(), dropTable.getTableName(),
-        dropTable.isPurge());
+  public LogicalNode visitDropTable(PlanContext context, Stack<Expr> stack, DropTable dropTable) {
+    DropTableNode dropTableNode = context.queryBlock.getNodeFromExpr(dropTable);
+    dropTableNode.set(dropTable.getTableName(), dropTable.isPurge());
     return dropTableNode;
   }
 
-  public static int [] dateToIntArray(String years, String months, String days) 
-    throws PlanningException { 
-    int year = Integer.valueOf(years);
-    int month = Integer.valueOf(months);
-    int day = Integer.valueOf(days);
+  /*===============================================================================================
+    Util SECTION
+  ===============================================================================================*/
 
-    if (!(1 <= year && year <= 9999)) {
-      throw new PlanningException(String.format("Years (%d) must be between 1 and 9999 integer value", year));
-    }
+  public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode groupbyNode) {
+    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
 
-    if (!(1 <= month && month <= 12)) {
-      throw new PlanningException(String.format("Months (%d) must be between 1 and 12 integer value", month));
+    if (!groupbyNode.getInSchema().containsAll(columnRefs)) {
+      return false;
     }
 
-    if (!(1<= day && day <= 31)) {
-      throw new PlanningException(String.format("Days (%d) must be between 1 and 31 integer value", day));
+    Set<String> tableIds = Sets.newHashSet();
+    // getting distinct table references
+    for (Column col : columnRefs) {
+      if (!tableIds.contains(col.getQualifier())) {
+        tableIds.add(col.getQualifier());
+      }
     }
 
-    int [] results = new int[3];
-    results[0] = year;
-    results[1] = month;
-    results[2] = day;
+    if (tableIds.size() > 1) {
+      return false;
+    }
 
-    return results;
+    return true;
   }
 
-  public static int [] timeToIntArray(String hours, String minutes, String seconds, String fractionOfSecond) 
-    throws PlanningException { 
-    int hour = Integer.valueOf(hours);
-    int minute = Integer.valueOf(minutes);
-    int second = Integer.valueOf(seconds);
-    int fraction = 0;
-    if (fractionOfSecond != null) {
-      fraction = Integer.valueOf(fractionOfSecond);
-    }
-
-    if (!(0 <= hour && hour <= 23)) {
-      throw new PlanningException(String.format("Hours (%d) must be between 0 and 24 integer value", hour));
-    }
+  public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode joinNode,
+                                                 boolean isTopMostJoin) {
+    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
 
-    if (!(0 <= minute && minute <= 59)) {
-      throw new PlanningException(String.format("Minutes (%d) must be between 0 and 59 integer value", minute));
+    if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+      return false;
     }
 
-    if (!(0 <= second && second <= 59)) {
-      throw new PlanningException(String.format("Seconds (%d) must be between 0 and 59 integer value", second));
+    if (!joinNode.getInSchema().containsAll(columnRefs)) {
+      return false;
     }
 
-    if (fraction != 0) {
-      if (!(0 <= fraction && fraction <= 999)) {
-        throw new PlanningException(String.format("Seconds (%d) must be between 0 and 999 integer value", fraction));
-      }
+    // When a 'case-when' is used with outer join, the case-when expression must be evaluated
+    // at the topmost join operator.
+    // TODO - It's also valid that case-when is evalauted at the topmost outer operator.
+    //        But, how can we know there is no further outer join operator after this node?
+    if (checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin)) {
+      return true;
+    } else {
+      return false;
     }
-
-    int [] results = new int[4];
-    results[0] = hour;
-    results[1] = minute;
-    results[2] = second;
-    results[3] = fraction;
-
-    return results;
   }
-    
-  /*===============================================================================================
-    Expression SECTION
-   ===============================================================================================*/
-
-  public EvalNode createEvalTree(LogicalPlan plan, QueryBlock block, final Expr expr)throws PlanningException {
-
-    switch(expr.getType()) {
-      // constants
-      case NullLiteral:
-        return new ConstEval(NullDatum.get());
-
-      case Literal:
-        LiteralValue literal = (LiteralValue) expr;
-        switch (literal.getValueType()) {
-          case Boolean:
-            return new ConstEval(DatumFactory.createBool(((BooleanLiteral)literal).isTrue()));
-          case String:
-            return new ConstEval(DatumFactory.createText(literal.getValue()));
-          case Unsigned_Integer:
-            return new ConstEval(DatumFactory.createInt4(literal.getValue()));
-          case Unsigned_Large_Integer:
-            return new ConstEval(DatumFactory.createInt8(literal.getValue()));
-          case Unsigned_Float:
-            return new ConstEval(DatumFactory.createFloat8(literal.getValue()));
-          default:
-            throw new RuntimeException("Unsupported type: " + literal.getValueType());
-        }
-
-      case TimeLiteral: {
-        TimeLiteral timeLiteral = (TimeLiteral) expr;
-        TimeValue timeValue = timeLiteral.getTime();
-        int [] times = LogicalPlanner.timeToIntArray(timeValue.getHours(),
-                                         timeValue.getMinutes(),
-                                         timeValue.getSeconds(),
-                                         timeValue.getSecondsFraction());
-
-        TimeDatum datum;
-        if (timeValue.hasSecondsFraction()) {
-          datum = new TimeDatum(times[0], times[1], times[2], times[3]);
-        } else {
-          datum = new TimeDatum(times[0], times[1], times[2]);
-        }
-        return new ConstEval(datum);
-      }
-
-      case DateLiteral: {
-        DateLiteral dateLiteral = (DateLiteral) expr;
-        DateValue dateValue = dateLiteral.getDate();
-        int [] dates = LogicalPlanner.dateToIntArray(dateValue.getYears(),
-            dateValue.getMonths(),
-            dateValue.getDays());
-
-        DateDatum datum;
-        datum = new DateDatum(dates[0], dates[1], dates[2]);
-        return new ConstEval(datum);
-      }
-
-      case TimestampLiteral: {
-        TimestampLiteral timestampLiteral = (TimestampLiteral) expr;
-        DateValue dateValue = timestampLiteral.getDate();
-        TimeValue timeValue = timestampLiteral.getTime();
-
-        int [] dates = LogicalPlanner.dateToIntArray(dateValue.getYears(),
-                                        dateValue.getMonths(),
-                                        dateValue.getDays());
-        int [] times = LogicalPlanner.timeToIntArray(timeValue.getHours(),
-                                        timeValue.getMinutes(),
-                                        timeValue.getSeconds(),
-                                        timeValue.getSecondsFraction());
-        DateTime dateTime;
-        if (timeValue.hasSecondsFraction()) {
-          dateTime = new DateTime(dates[0], dates[1], dates[2], times[0], times[1], times[2], times[3]);
-        } else {
-          dateTime = new DateTime(dates[0], dates[1], dates[2], times[0], times[1], times[2]);
-        }
-
-        return new ConstEval(new TimestampDatum(dateTime));
-      }
-
-      case Sign:
-        SignedExpr signedExpr = (SignedExpr) expr;
-        EvalNode numericExpr = createEvalTree(plan, block, signedExpr.getChild());
-        if (signedExpr.isNegative()) {
-          return new SignedEval(signedExpr.isNegative(), numericExpr);
-        } else {
-          return numericExpr;
-        }
-
-      case Cast:
-        CastExpr cast = (CastExpr) expr;
-        return new CastEval(createEvalTree(plan, block, cast.getOperand()),
-            convertDataType(cast.getTarget()));
-
-      case ValueList: {
-        ValueListExpr valueList = (ValueListExpr) expr;
-        Datum[] values = new Datum[valueList.getValues().length];
-        ConstEval [] constEvals = new ConstEval[valueList.getValues().length];
-        for (int i = 0; i < valueList.getValues().length; i++) {
-          constEvals[i] = (ConstEval) createEvalTree(plan, block, valueList.getValues()[i]);
-          values[i] = constEvals[i].getValue();
-        }
-        return new RowConstantEval(values);
-      }
-
-        // unary expression
-      case Not:
-        NotExpr notExpr = (NotExpr) expr;
-        return new NotEval(createEvalTree(plan, block, notExpr.getChild()));
-
-      case Between: {
-        BetweenPredicate between = (BetweenPredicate) expr;
-        BetweenPredicateEval betweenEval = new BetweenPredicateEval(between.isNot(), between.isSymmetric(),
-            createEvalTree(plan, block, between.predicand()), createEvalTree(plan, block, between.begin()),
-            createEvalTree(plan, block, between.end()));
-        return betweenEval;
-      }
-      // pattern matching predicates
-      case LikePredicate:
-      case SimilarToPredicate:
-      case Regexp:
-        PatternMatchPredicate patternMatch = (PatternMatchPredicate) expr;
-        EvalNode field = createEvalTree(plan, block, patternMatch.getPredicand());
-        ConstEval pattern = (ConstEval) createEvalTree(plan, block, patternMatch.getPattern());
-
-        // A pattern is a const value in pattern matching predicates.
-        // In a binary expression, the result is always null if a const value in left or right side is null.
-        if (pattern.getValue() instanceof NullDatum) {
-          return new ConstEval(NullDatum.get());
-        } else {
-          if (expr.getType() == OpType.LikePredicate) {
-            return new LikePredicateEval(patternMatch.isNot(), field, pattern, patternMatch.isCaseInsensitive());
-          } else if (expr.getType() == OpType.SimilarToPredicate) {
-            return new SimilarToPredicateEval(patternMatch.isNot(), field, pattern);
-          } else {
-            return new RegexPredicateEval(patternMatch.isNot(), field, pattern, patternMatch.isCaseInsensitive());
-          }
-        }
-
-      case InPredicate: {
-        InPredicate inPredicate = (InPredicate) expr;
-        FieldEval predicand =
-            new FieldEval(plan.resolveColumn(block, null, (ColumnReferenceExpr) inPredicate.getPredicand()));
-        RowConstantEval rowConstantEval = (RowConstantEval) createEvalTree(plan, block, inPredicate.getInValue());
-        return new InEval(predicand, rowConstantEval, inPredicate.isNot());
-      }
-
-      case And:
-      case Or:
-      case Equals:
-      case NotEquals:
-      case LessThan:
-      case LessThanOrEquals:
-      case GreaterThan:
-      case GreaterThanOrEquals:
-      case Plus:
-      case Minus:
-      case Multiply:
-      case Divide:
-      case Modular:
-      case Concatenate:
-        BinaryOperator bin = (BinaryOperator) expr;
-        return new BinaryEval(exprTypeToEvalType(expr.getType()),
-            createEvalTree(plan, block, bin.getLeft()),
-            createEvalTree(plan, block, bin.getRight()));
-
-      // others
-      case Column:
-        return createFieldEval(plan, block, (ColumnReferenceExpr) expr);
-
-      case CountRowsFunction: {
-        FunctionDesc countRows = catalog.getFunction("count", FunctionType.AGGREGATION, new DataType[] {});
-
-        try {
-          block.setHasGrouping();
-
-          return new AggregationFunctionCallEval(countRows, (AggFunction) countRows.newInstance(),
-              new EvalNode[] {});
-        } catch (InternalException e) {
-          throw new UndefinedFunctionException(CatalogUtil.
-              getCanonicalName(countRows.getSignature(), new DataType[]{}));
-        }
-      }
-      case GeneralSetFunction: {
-        GeneralSetFunctionExpr setFunction = (GeneralSetFunctionExpr) expr;


<TRUNCATED>

Mime
View raw message