tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [31/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)
Date Tue, 02 Jul 2013 14:16:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/QueryBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/QueryBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/QueryBlock.java
new file mode 100644
index 0000000..46fdb4b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/QueryBlock.java
@@ -0,0 +1,586 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.parser;
+
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.GsonCreator;
+import org.apache.tajo.engine.planner.JoinType;
+import org.apache.tajo.engine.planner.PlanningContext;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class contains a set of meta data about a query statement
+ */
+public class QueryBlock extends ParseTree {
+  private String storeTable = null;
+  private boolean projectAll = false;
+  private boolean distinct = false;
+  /* select target list */
+  private Target [] targetList = null;
+  /* from clause */
+  private List<FromTable> fromTables = Lists.newArrayList();
+  /* from clause with join */
+  private JoinClause joinClause = null;
+  /* where clause */
+  private EvalNode whereCond = null;
+  /* if true, there is at least one aggregation function. */
+  private boolean aggregation = false;
+  /* if true, there is at least grouping field. */
+  private GroupByClause groupbyClause = null;
+  /* having condition */
+  private EvalNode havingCond = null;
+  /* keys for ordering */
+  private SortSpec [] sortKeys = null;
+  /* limit clause */
+  private LimitClause limitClause = null;
+
+  public QueryBlock(PlanningContext context) {
+    super(context, StatementType.SELECT);
+  }
+
+  public StatementType getStatementType() {
+    return this.type;
+  }
+
+  public void setStoreTable(String storeTable) {
+    this.storeTable = storeTable;
+  }
+
+  public String getStoreTable() {
+    return this.storeTable;
+  }
+
+  public final void setProjectAll() {
+    this.projectAll = true;
+  }
+
+  public final boolean getProjectAll() {
+    return this.projectAll;
+  }
+
+  public final void setDistinct() {
+    this.distinct = true;
+  }
+
+  public final boolean isDistinct() {
+    return this.distinct;
+  }
+
+  public final void setTargetList(Target [] targets) {
+    this.targetList = targets;
+  }
+
+  public final Target [] getTargetList() {
+    return this.targetList;
+  }
+
+  public final boolean hasAggregation() {
+    return this.aggregation || hasGroupbyClause();
+  }
+
+  public final void setAggregation() {
+    this.aggregation = true;
+  }
+
+  public final boolean hasGroupbyClause() {
+    return this.groupbyClause != null;
+  }
+
+  public final void setGroupByClause(final GroupByClause groupbyClause) {
+    this.groupbyClause = groupbyClause;
+  }
+
+  public final GroupByClause getGroupByClause() {
+    return this.groupbyClause;
+  }
+
+  public final boolean hasHavingCond() {
+    return this.havingCond != null;
+  }
+
+  public final void setHavingCond(final EvalNode havingCond) {
+    this.havingCond = havingCond;
+  }
+
+  public final EvalNode getHavingCond() {
+    return this.havingCond;
+  }
+
+  public final boolean hasOrderByClause() {
+    return this.sortKeys != null;
+  }
+
+  public final boolean hasLimitClause() {
+    return this.limitClause != null;
+  }
+
+  public final SortSpec[] getSortKeys() {
+    return this.sortKeys;
+  }
+
+  public void setSortKeys(final SortSpec [] keys) {
+    this.sortKeys = keys;
+  }
+
+  public void setLimit(final LimitClause limit) {
+    this.limitClause = limit;
+  }
+
+  public LimitClause getLimitClause() {
+    return this.limitClause;
+  }
+
+  // From Clause
+  public final boolean hasFromClause() {
+    return fromTables.size() > 0 || joinClause != null;
+  }
+
+  public final void addFromTable(final FromTable table, boolean global) {
+    super.addTableRef(table);
+    this.fromTables.add(table);
+  }
+
+  public final boolean hasExplicitJoinClause() {
+    return joinClause != null;
+  }
+
+  public final void setJoinClause(final JoinClause joinClause) {
+    this.joinClause = joinClause;
+  }
+
+  public final JoinClause getJoinClause() {
+    return this.joinClause;
+  }
+
+  public final int getNumFromTables() {
+    return fromTables.size();
+  }
+
+  public final FromTable [] getFromTables() {
+    return this.fromTables.toArray(new FromTable[fromTables.size()]);
+  }
+
+  public final boolean hasWhereClause() {
+    return this.whereCond != null;
+  }
+
+  public final void setWhereCondition(final EvalNode whereCond) {
+    this.whereCond = whereCond;
+  }
+
+  public final EvalNode getWhereCondition() {
+    return this.whereCond;
+  }
+
+  public static class GroupByClause implements Cloneable {
+    @Expose private boolean isEmptyGroupSet = false;
+    @Expose private List<GroupElement> groups
+        = new ArrayList<QueryBlock.GroupElement>();
+
+    public GroupByClause() {
+    }
+
+    public void setEmptyGroupSet() {
+      isEmptyGroupSet = true;
+    }
+
+    public void addGroupSet(GroupElement group) {
+      groups.add(group);
+      if (isEmptyGroupSet) {
+        isEmptyGroupSet = false;
+      }
+    }
+
+    public boolean isEmptyGroupSet() {
+      return this.isEmptyGroupSet;
+    }
+
+    public List<GroupElement> getGroupSet() {
+      return Collections.unmodifiableList(groups);
+    }
+
+    public String toString() {
+      Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
+          .setPrettyPrinting().create();
+      return gson.toJson(this);
+    }
+
+    public Object clone() throws CloneNotSupportedException {
+      GroupByClause g = (GroupByClause) super.clone();
+      g.isEmptyGroupSet = isEmptyGroupSet;
+      g.groups = new ArrayList<QueryBlock.GroupElement>(groups);
+
+      return g;
+    }
+  }
+
+  public static class GroupElement implements Cloneable {
+    @Expose private GroupType type;
+    @Expose private Column [] columns;
+
+    @SuppressWarnings("unused")
+    private GroupElement() {
+      // for gson
+    }
+
+    public GroupElement(GroupType type, Column [] columns) {
+      this.type = type;
+      this.columns = columns;
+    }
+
+    public GroupType getType() {
+      return this.type;
+    }
+
+    public Column [] getColumns() {
+      return this.columns;
+    }
+
+    public String toString() {
+      Gson gson = new GsonBuilder().excludeFieldsWithoutExposeAnnotation()
+          .setPrettyPrinting().create();
+      return gson.toJson(this);
+    }
+
+    public Object clone() throws CloneNotSupportedException {
+      GroupElement groups = (GroupElement) super.clone();
+      groups.type = type;
+      groups.columns = new Column[columns.length];
+      for (int i = 0; i < columns.length; i++) {
+        groups.columns[i++] = (Column) columns[i].clone();
+      }
+      return groups;
+    }
+  }
+
+  public static enum GroupType {
+    GROUPBY,
+    CUBE,
+    ROLLUP,
+    EMPTYSET
+  }
+
+  public static class JoinClause implements Cloneable {
+    @Expose private JoinType joinType;
+    @Expose private FromTable left;
+    @Expose private FromTable right;
+    @Expose private JoinClause leftJoin;
+    @Expose private EvalNode joinQual;
+    @Expose private Column [] joinColumns;
+    @Expose private boolean natural = false;
+
+    @SuppressWarnings("unused")
+    private JoinClause() {
+      // for gson
+    }
+
+    public JoinClause(final JoinType joinType) {
+      this.joinType = joinType;
+    }
+
+    public JoinClause(final JoinType joinType, final FromTable right) {
+      this.joinType = joinType;
+      this.right = right;
+    }
+
+    public JoinClause(final JoinType joinType, final FromTable left,
+                      final FromTable right) {
+      this(joinType, right);
+      this.left = left;
+    }
+
+    public JoinType getJoinType() {
+      return this.joinType;
+    }
+
+    public void setNatural() {
+      this.natural = true;
+    }
+
+    public boolean isNatural() {
+      return this.natural;
+    }
+
+    public void setRight(FromTable right) {
+      this.right = right;
+    }
+
+    public void setLeft(FromTable left) {
+      this.left = left;
+    }
+
+    public void setLeft(JoinClause left) {
+      this.leftJoin = left;
+    }
+
+    public boolean hasLeftJoin() {
+      return leftJoin != null;
+    }
+
+    public FromTable getLeft() {
+      return this.left;
+    }
+
+    public FromTable getRight() {
+      return this.right;
+    }
+
+    public JoinClause getLeftJoin() {
+      return this.leftJoin;
+    }
+
+    public void setJoinQual(EvalNode qual) {
+      this.joinQual = qual;
+    }
+
+    public boolean hasJoinQual() {
+      return this.joinQual != null;
+    }
+
+    public EvalNode getJoinQual() {
+      return this.joinQual;
+    }
+
+    public void setJoinColumns(Column [] columns) {
+      this.joinColumns = columns;
+    }
+
+    public boolean hasJoinColumns() {
+      return this.joinColumns != null;
+    }
+
+    public Column [] getJoinColumns() {
+      return this.joinColumns;
+    }
+
+
+    public String toString() {
+      Gson gson = new GsonBuilder().setPrettyPrinting().create();
+      return gson.toJson(this);
+    }
+  }
+
+  public static class Target implements Cloneable {
+    @Expose private EvalNode eval;
+    @Expose private Column column;
+    @Expose private String alias = null;
+    @Expose private int targetId;
+
+    public Target(EvalNode eval, int targetId) {
+      this.eval = eval;
+      if (eval.getType() == EvalNode.Type.AGG_FUNCTION &&
+          eval.getValueType().length > 1) { // hack for partial result
+        this.column = new Column(eval.getName(), Type.ARRAY);
+      } else {
+        this.column = new Column(eval.getName(), eval.getValueType()[0]);
+      }
+      this.targetId = targetId;
+    }
+
+    public Target(final EvalNode eval, final String alias, int targetId) {
+      this(eval, targetId);
+      this.alias = alias;
+    }
+
+    public final void setAlias(String alias) {
+      this.alias = alias;
+    }
+
+    public final String getAlias() {
+      return alias;
+    }
+
+    public final boolean hasAlias() {
+      return alias != null;
+    }
+
+    public EvalNode getEvalTree() {
+      return this.eval;
+    }
+
+    public Column getColumnSchema() {
+      return this.column;
+    }
+
+    public int getTargetId() {
+      return this.targetId;
+    }
+
+    public String toString() {
+      StringBuilder sb = new StringBuilder(eval.toString());
+      if(hasAlias()) {
+        sb.append(", alias=").append(alias);
+      }
+      return sb.toString();
+    }
+
+    public boolean equals(Object obj) {
+      if(obj instanceof Target) {
+        Target other = (Target) obj;
+
+        boolean b1 = eval.equals(other.eval);
+        boolean b2 = column.equals(other.column);
+        boolean b3 = TUtil.checkEquals(alias, other.alias);
+
+        return b1 && b2 && b3;
+      } else {
+        return false;
+      }
+    }
+
+    public int hashCode() {
+      return this.eval.getName().hashCode();
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+      Target target = (Target) super.clone();
+      target.eval = (EvalNode) eval.clone();
+      target.column = (Column) column.clone();
+      target.alias = alias != null ? alias : null;
+
+      return target;
+    }
+
+    public String toJSON() {
+      return GsonCreator.getInstance().toJson(this, Target.class);
+    }
+  }
+
+  public static class FromTable implements Cloneable {
+    @Expose
+    private TableDesc desc;
+    @Expose
+    private String alias = null;
+
+    public FromTable() {}
+
+    public FromTable(final TableDesc desc) {
+      this.desc = desc;
+    }
+
+    public FromTable(final TableDesc desc, final String alias) {
+      this(desc);
+      this.alias = alias;
+    }
+
+    public final String getTableName() {
+      return desc.getId();
+    }
+
+    public final String getTableId() {
+      return alias == null ? desc.getId() : alias;
+    }
+
+    public final CatalogProtos.StoreType getStoreType() {
+      return desc.getMeta().getStoreType();
+    }
+
+    public final Schema getSchema() {
+      return desc.getMeta().getSchema();
+    }
+
+    public final void setAlias(String alias) {
+      this.alias = alias;
+    }
+
+    public final String getAlias() {
+      return alias;
+    }
+
+    public final boolean hasAlias() {
+      return alias != null;
+    }
+
+    public final String toString() {
+      if (alias != null)
+        return desc.getId() + " as " + alias;
+      else
+        return desc.getId();
+    }
+
+    public boolean equals(Object obj) {
+      if (obj instanceof FromTable) {
+        FromTable other = (FromTable) obj;
+        return this.desc.equals(other.desc)
+            && TUtil.checkEquals(this.alias, other.alias);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+      FromTable table = (FromTable) super.clone();
+      table.desc = (TableDesc) desc.clone();
+      table.alias = alias;
+
+      return table;
+    }
+
+    public String toJSON() {
+      desc.initFromProto();
+      Gson gson = GsonCreator.getInstance();
+      return gson.toJson(this, FromTable.class);
+    }
+  }
+
+  public static class LimitClause implements Cloneable {
+    @Expose private long fetchFirstNum;
+
+    public LimitClause(long fetchFirstNum) {
+      this.fetchFirstNum = fetchFirstNum;
+    }
+
+    public long getLimitRow() {
+      return this.fetchFirstNum;
+    }
+
+    @Override
+    public String toString() {
+      return "LIMIT " + fetchFirstNum;
+    }
+
+    public boolean equals(Object obj) {
+      return obj instanceof LimitClause &&
+          fetchFirstNum == ((LimitClause)obj).fetchFirstNum;
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+      LimitClause newLimit = (LimitClause) super.clone();
+      newLimit.fetchFirstNum = fetchFirstNum;
+      return newLimit;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java
new file mode 100644
index 0000000..126e4bd
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SetStmt.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.parser;
+
+import org.apache.tajo.engine.planner.PlanningContext;
+
+import java.util.Map.Entry;
+
+public class SetStmt extends ParseTree {
+  private ParseTree leftTree;
+  private ParseTree rightTree;
+  private boolean distinct = true;
+
+  public SetStmt(final PlanningContext context,
+                 final StatementType type,
+                 final ParseTree leftTree,
+                 final ParseTree rightTree,
+                 boolean distinct) {
+    super(context, type);
+    this.leftTree = leftTree;
+    this.rightTree = rightTree;
+    this.distinct = distinct;
+
+    for (Entry<String, String> entry : leftTree.getAliasToNames()) {
+       addTableRef(entry.getValue(), entry.getKey());
+    }
+
+    for (Entry<String, String> entry : rightTree.getAliasToNames()) {
+      addTableRef(entry.getValue(), entry.getKey());
+    }
+  }
+  
+  public boolean isDistinct() {
+    return distinct;
+  }
+  
+  public ParseTree getLeftTree() {
+    return this.leftTree;
+  }
+  
+  public ParseTree getRightTree() {
+    return this.rightTree;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java
new file mode 100644
index 0000000..de9b7e6
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/StatementType.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.parser;
+
+public enum StatementType {
+	// Store
+  STORE,
+  CREATE_TABLE_AS,
+  COPY,
+
+  // Select
+	SELECT,
+	UNION,
+	EXCEPT,
+	INTERSECT,
+	
+	// Update
+	INSERT,
+	UPDATE,
+	DELETE,
+	
+	// Schema	
+	CREATE_TABLE,
+	DROP_TABLE,
+	
+	// INDEX
+	CREATE_INDEX,
+	DROP_INDEX,
+
+	// Control
+	SHOW_TABLES,
+	DESC_TABLE,
+	SHOW_FUNCTION,
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java
new file mode 100644
index 0000000..2fce526
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/TableMap.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.parser;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+public class TableMap {
+  private Map<String, String> aliasToNameMap = Maps.newHashMap();
+  private Map<String, Set<String>> nameToAliasMap = Maps.newHashMap();
+
+  public boolean exists(String alias) {
+    return aliasToNameMap.containsKey(alias);
+  }
+
+  public void addFromTable(String tableName, String alias) {
+    if (aliasToNameMap.containsKey(alias)) {
+      throw new AlreadyExistsTableException();
+    }
+    aliasToNameMap.put(alias, tableName);
+
+    if (nameToAliasMap.containsKey(tableName)) {
+      Preconditions.checkState(!nameToAliasMap.get(tableName).contains(alias),
+          "There is inconsistency of the map between name and alias");
+      nameToAliasMap.get(tableName).add(alias);
+    } else {
+      nameToAliasMap.put(tableName, Sets.newHashSet(alias));
+    }
+  }
+
+  public void addFromTable(QueryBlock.FromTable fromTable) {
+    addFromTable(fromTable.getTableName(), fromTable.getTableId());
+  }
+
+  public String getTableNameByAlias(String alias) {
+    return aliasToNameMap.get(alias);
+  }
+
+  public Iterable<String> getAllTableNames() {
+    return nameToAliasMap.keySet();
+  }
+
+  public Iterable<Entry<String, String>> getAliasToNames() {
+    return aliasToNameMap.entrySet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java
new file mode 100644
index 0000000..2fbdab1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/JoinType.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+public enum JoinType {
+  CROSS_JOIN,
+  INNER,
+	LEFT_OUTER,
+	RIGHT_OUTER,
+	FULL_OUTER,
+  UNION
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
new file mode 100644
index 0000000..d62ec69
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -0,0 +1,742 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.parser.CreateTableStmt;
+import org.apache.tajo.engine.parser.ParseTree;
+import org.apache.tajo.engine.parser.QueryBlock;
+import org.apache.tajo.engine.parser.QueryBlock.Target;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.query.exception.InvalidQueryException;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.util.IndexUtil;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * This class optimizes a logical plan corresponding to one query block.
+ */
+public class LogicalOptimizer {
+  private static Log LOG = LogFactory.getLog(LogicalOptimizer.class);
+  
+  private LogicalOptimizer() {
+  }
+
+  public static LogicalNode optimize(PlanningContext context, LogicalNode plan) {
+    LogicalNode toBeOptimized;
+
+    try {
+      toBeOptimized = (LogicalNode) plan.clone();
+    } catch (CloneNotSupportedException e) {
+      LOG.error(e);
+      throw new InvalidQueryException("Cannot clone: " + plan);
+    }
+
+    switch (context.getParseTree().getType()) {
+      case SELECT:
+        //case UNION: // TODO - to be implemented
+        //case EXCEPT:
+        //case INTERSECT:
+      case CREATE_TABLE_AS:
+        // if there are selection node
+        if(PlannerUtil.findTopNode(plan, ExprType.SELECTION) != null) {
+          pushSelection(context, toBeOptimized);
+        }
+
+        try {
+          pushProjection(context, toBeOptimized);
+        } catch (CloneNotSupportedException e) {
+          throw new InvalidQueryException(e);
+        }
+
+        break;
+      default:
+    }
+
+    return toBeOptimized;
+  }
+  
+  public static LogicalNode pushIndex(LogicalNode plan , StorageManager sm) throws IOException {
+    if(PlannerUtil.findTopNode(plan, ExprType.SCAN) == null) {
+      return plan;
+    }
+    LogicalNode toBeOptimized;
+    try {
+      toBeOptimized = (LogicalNode) plan.clone();
+    } catch (CloneNotSupportedException e) {
+      LOG.error(e);
+      throw new InvalidQueryException("Cannot clone: " + plan);
+    }
+  
+    changeScanToIndexNode(null ,toBeOptimized , sm);
+    return toBeOptimized;
+    
+  }
+  private static void changeScanToIndexNode
+    (LogicalNode parent , LogicalNode cur , StorageManager sm ) throws IOException {
+    if( cur instanceof BinaryNode) {
+      changeScanToIndexNode(cur , ((BinaryNode)cur).getOuterNode() , sm);
+      changeScanToIndexNode(cur , ((BinaryNode)cur).getInnerNode() , sm);
+    } else {
+      switch(cur.getType()) {
+      case CREATE_INDEX:
+        return;
+      case SCAN:
+        ScanNode scan = (ScanNode)cur;
+        EvalNode qual = scan.getQual();
+        if(qual == null) { 
+          return;
+        }else {
+          String tableName = scan.getTableId();
+          Path path = new Path(sm.getTablePath(tableName), "index");
+         
+          if(sm.getFileSystem().exists(path)) {
+            
+            TableMeta meta = sm.getTableMeta(path);       
+            IndexScanNode node;
+            if ((node = IndexUtil.indexEval(scan, meta.getOptions())) == null) {
+              return;
+            }
+            if( parent instanceof BinaryNode ) {
+              if (scan.equals(((BinaryNode)parent).getOuterNode())) {
+                ((BinaryNode)parent).setOuter(node);
+              } else {
+                ((BinaryNode)parent).setInner(node);
+               }
+            } else {
+              ((UnaryNode)parent).setSubNode(node);
+            }
+          }
+          return;
+        }
+      default:
+        changeScanToIndexNode(cur , ((UnaryNode)cur).getSubNode() , sm);
+        break;
+      }
+    }
+  }
+  /**
+   * This method pushes down the selection into the appropriate sub 
+   * logical operators.
+   * <br />
+   * 
+   * There are three operators that can have search conditions.
+   * Selection, Join, and GroupBy clause.
+   * However, the search conditions of Join and GroupBy cannot be pushed down 
+   * into child operators because they can be used when the data layout change
+   * caused by join and grouping relations.
+   * <br />
+   * 
+   * However, some of the search conditions of selection clause can be pushed 
+   * down into appropriate sub operators. Some comparison expressions on 
+   * multiple relations are actually join conditions, and other expression 
+   * on single relation can be used in a scan operator or an Index Scan 
+   * operator.   
+   * 
+   * @param ctx
+   * @param plan
+   */
+  private static void pushSelection(PlanningContext ctx, LogicalNode plan) {
+    SelectionNode selNode = (SelectionNode) PlannerUtil.findTopNode(plan,
+        ExprType.SELECTION);
+    Preconditions.checkNotNull(selNode);
+    
+    Stack<LogicalNode> stack = new Stack<LogicalNode>();
+    EvalNode [] cnf = EvalTreeUtil.getConjNormalForm(selNode.getQual());
+    pushSelectionRecursive(plan, Lists.newArrayList(cnf), stack);
+  }
+
+  private static void pushSelectionRecursive(LogicalNode plan,
+                                             List<EvalNode> cnf, Stack<LogicalNode> stack) {
+    
+    switch(plan.getType()) {
+    
+    case SELECTION:
+      SelectionNode selNode = (SelectionNode) plan;
+      stack.push(selNode);
+      pushSelectionRecursive(selNode.getSubNode(),cnf, stack);
+      stack.pop();
+      
+      // remove the selection operator if there is no search condition 
+      // after selection push.
+      if(cnf.size() == 0) {
+        LogicalNode node = stack.peek();
+        if (node instanceof UnaryNode) {
+          UnaryNode unary = (UnaryNode) node;
+          unary.setSubNode(selNode.getSubNode());
+        } else {
+          throw new InvalidQueryException("Unexpected Logical Query Plan");
+        }
+      }
+      break;
+    case JOIN:
+      JoinNode join = (JoinNode) plan;
+
+      LogicalNode outer = join.getOuterNode();
+      LogicalNode inner = join.getInnerNode();
+
+      pushSelectionRecursive(outer, cnf, stack);
+      pushSelectionRecursive(inner, cnf, stack);
+
+      List<EvalNode> matched = Lists.newArrayList();
+      for (EvalNode eval : cnf) {
+        if (canBeEvaluated(eval, plan)) {
+          matched.add(eval);
+        }
+      }
+
+      EvalNode qual = null;
+      if (matched.size() > 1) {
+        // merged into one eval tree
+        qual = EvalTreeUtil.transformCNF2Singleton(
+            matched.toArray(new EvalNode [matched.size()]));
+      } else if (matched.size() == 1) {
+        // if the number of matched expr is one
+        qual = matched.get(0);
+      }
+
+      if (qual != null) {
+        JoinNode joinNode = (JoinNode) plan;
+        if (joinNode.hasJoinQual()) {
+          EvalNode conjQual = EvalTreeUtil.
+              transformCNF2Singleton(joinNode.getJoinQual(), qual);
+          joinNode.setJoinQual(conjQual);
+        } else {
+          joinNode.setJoinQual(qual);
+        }
+        if (joinNode.getJoinType() == JoinType.CROSS_JOIN) {
+          joinNode.setJoinType(JoinType.INNER);
+        }
+        cnf.removeAll(matched);
+      }
+
+      break;
+
+    case SCAN:
+      matched = Lists.newArrayList();
+      for (EvalNode eval : cnf) {
+        if (canBeEvaluated(eval, plan)) {
+          matched.add(eval);
+        }
+      }
+
+      qual = null;
+      if (matched.size() > 1) {
+        // merged into one eval tree
+        qual = EvalTreeUtil.transformCNF2Singleton(
+            matched.toArray(new EvalNode [matched.size()]));
+      } else if (matched.size() == 1) {
+        // if the number of matched expr is one
+        qual = matched.get(0);
+      }
+
+      if (qual != null) { // if a matched qual exists
+        ScanNode scanNode = (ScanNode) plan;
+        scanNode.setQual(qual);
+      }
+
+      cnf.removeAll(matched);
+      break;
+
+    default:
+      stack.push(plan);
+      if (plan instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) plan;
+        pushSelectionRecursive(unary.getSubNode(), cnf, stack);
+      } else if (plan instanceof BinaryNode) {
+        BinaryNode binary = (BinaryNode) plan;
+        pushSelectionRecursive(binary.getOuterNode(), cnf, stack);
+        pushSelectionRecursive(binary.getInnerNode(), cnf, stack);
+      }
+      stack.pop();
+      break;
+    }
+  }
+  
+  public static boolean canBeEvaluated(EvalNode eval, LogicalNode node) {
+    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(eval);
+
+    if (node.getType() == ExprType.JOIN) {
+      JoinNode joinNode = (JoinNode) node;
+      Set<String> tableIds = Sets.newHashSet();
+      // getting distinct table references
+      for (Column col : columnRefs) {
+        if (!tableIds.contains(col.getTableName())) {
+          tableIds.add(col.getTableName());
+        }
+      }
+      
+      // if the references only indicate two relation, the condition can be 
+      // pushed into a join operator.
+      if (tableIds.size() != 2) {
+        return false;
+      }
+      
+      String [] outer = PlannerUtil.getLineage(joinNode.getOuterNode());
+      String [] inner = PlannerUtil.getLineage(joinNode.getInnerNode());
+
+      Set<String> o = Sets.newHashSet(outer);
+      Set<String> i = Sets.newHashSet(inner);
+      if (outer == null || inner == null) {      
+        throw new InvalidQueryException("ERROR: Unexpected logical plan");
+      }
+      Iterator<String> it = tableIds.iterator();
+      if (o.contains(it.next()) && i.contains(it.next())) {
+        return true;
+      }
+      
+      it = tableIds.iterator();
+      if (i.contains(it.next()) && o.contains(it.next())) {
+        return true;
+      }
+      
+      return false;
+    } else {
+      for (Column col : columnRefs) {
+        if (!node.getInSchema().contains(col.getQualifiedName())) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+  }
+
+  /**
+   * This method pushes down the projection list into the appropriate and
+   * below logical operators.
+   * @param context
+   * @param plan
+   */
+  private static void pushProjection(PlanningContext context,
+                                     LogicalNode plan)
+      throws CloneNotSupportedException {
+    Stack<LogicalNode> stack = new Stack<LogicalNode>();
+
+    ParseTree tree = context.getParseTree();
+    QueryBlock block;
+
+    if (tree instanceof QueryBlock) {
+      block = (QueryBlock) context.getParseTree();
+
+    } else if (tree instanceof CreateTableStmt) {
+
+      CreateTableStmt createTableStmt = (CreateTableStmt) tree;
+
+      if (createTableStmt.hasQueryBlock()) {
+        block = createTableStmt.getSelectStmt();
+      } else {
+        return;
+      }
+    } else {
+
+      return;
+    }
+    OptimizationContext optCtx = new OptimizationContext(context,
+        block.getTargetList());
+
+    pushProjectionRecursive(context, optCtx, plan, stack, new HashSet<Column>());
+  }
+
+  /**
+   * Groupby, Join, and Scan can project necessary columns.
+   * This method has three roles:
+   * 1) collect column reference necessary for sortkeys, join keys, selection conditions, grouping fields,
+   * and having conditions
+   * 2) shrink the output schema of each operator so that the operator reduces the output columns according to
+   * the necessary columns of their parent operators
+   * 3) shrink the input schema of each operator according to the shrunk output schemas of the child operators.
+   *
+   *
+   * @param ctx
+   * @param node
+   * //@param necessary - columns necessary for above logical nodes, but it excepts the fields for the target list
+   * //@param targetList
+   * @return
+   */
+  private static LogicalNode pushProjectionRecursive(
+      final PlanningContext ctx, final OptimizationContext optContext,
+      final LogicalNode node, final Stack<LogicalNode> stack,
+      final Set<Column> necessary) throws CloneNotSupportedException {
+
+    LogicalNode outer;
+    LogicalNode inner;
+    switch (node.getType()) {
+      case ROOT: // It does not support the projection
+        LogicalRootNode root = (LogicalRootNode) node;
+        stack.add(root);
+        outer = pushProjectionRecursive(ctx, optContext,
+            root.getSubNode(), stack, necessary);
+        root.setInSchema(outer.getOutSchema());
+        root.setOutSchema(outer.getOutSchema());
+        break;
+
+      case STORE:
+        StoreTableNode store = (StoreTableNode) node;
+        stack.add(store);
+        outer = pushProjectionRecursive(ctx, optContext,
+            store.getSubNode(), stack, necessary);
+        store.setInSchema(outer.getOutSchema());
+        store.setOutSchema(outer.getOutSchema());
+        break;
+
+      case PROJECTION:
+        ProjectionNode projNode = (ProjectionNode) node;
+
+        stack.add(projNode);
+        outer = pushProjectionRecursive(ctx, optContext,
+            projNode.getSubNode(), stack, necessary);
+        stack.pop();
+
+        LogicalNode childNode = projNode.getSubNode();
+        if (optContext.getTargetListManager().isAllEvaluated() // if all exprs are evaluated
+            && (childNode.getType() == ExprType.JOIN ||
+               childNode.getType() == ExprType.GROUP_BY ||
+               childNode.getType() == ExprType.SCAN)) { // if the child node is projectable
+            projNode.getSubNode().setOutSchema(
+                optContext.getTargetListManager().getUpdatedSchema());
+            LogicalNode parent = stack.peek();
+            ((UnaryNode)parent).setSubNode(projNode.getSubNode());
+            return projNode.getSubNode();
+        } else {
+          // the output schema is not changed.
+          projNode.setInSchema(outer.getOutSchema());
+          projNode.setTargetList(
+              optContext.getTargetListManager().getUpdatedTarget());
+        }
+        return projNode;
+
+      case SELECTION: // It does not support the projection
+        SelectionNode selNode = (SelectionNode) node;
+
+        if (selNode.getQual() != null) {
+          necessary.addAll(EvalTreeUtil.findDistinctRefColumns(selNode.getQual()));
+        }
+
+        stack.add(selNode);
+        outer = pushProjectionRecursive(ctx, optContext, selNode.getSubNode(),
+            stack, necessary);
+        stack.pop();
+        selNode.setInSchema(outer.getOutSchema());
+        selNode.setOutSchema(outer.getOutSchema());
+        break;
+
+      case GROUP_BY: {
+        GroupbyNode groupByNode = (GroupbyNode)node;
+
+        if (groupByNode.hasHavingCondition()) {
+          necessary.addAll(EvalTreeUtil.findDistinctRefColumns(groupByNode.getHavingCondition()));
+        }
+
+        stack.add(groupByNode);
+        outer = pushProjectionRecursive(ctx, optContext,
+            groupByNode.getSubNode(), stack, necessary);
+        stack.pop();
+        groupByNode.setInSchema(outer.getOutSchema());
+        // set all targets
+        groupByNode.setTargetList(optContext.getTargetListManager().getUpdatedTarget());
+
+        TargetListManager targets = optContext.getTargetListManager();
+        List<Target> groupbyPushable = Lists.newArrayList();
+        List<Integer> groupbyPushableId = Lists.newArrayList();
+
+        EvalNode expr;
+        for (int i = 0; i < targets.size(); i++) {
+          expr = targets.getTarget(i).getEvalTree();
+          if (canBeEvaluated(expr, groupByNode) &&
+              EvalTreeUtil.findDistinctAggFunction(expr).size() > 0 && expr.getType() != EvalNode.Type.FIELD) {
+            targets.setEvaluated(i);
+            groupbyPushable.add((Target) targets.getTarget(i).clone());
+            groupbyPushableId.add(i);
+          }
+        }
+
+        return groupByNode;
+      }
+
+      case SORT: // It does not support the projection
+        SortNode sortNode = (SortNode) node;
+
+        for (SortSpec spec : sortNode.getSortKeys()) {
+          necessary.add(spec.getSortKey());
+        }
+
+        stack.add(sortNode);
+        outer = pushProjectionRecursive(ctx, optContext,
+            sortNode.getSubNode(), stack, necessary);
+        stack.pop();
+        sortNode.setInSchema(outer.getOutSchema());
+        sortNode.setOutSchema(outer.getOutSchema());
+        break;
+
+
+      case JOIN: {
+        JoinNode joinNode = (JoinNode) node;
+        Set<Column> parentNecessary = Sets.newHashSet(necessary);
+
+        if (joinNode.hasJoinQual()) {
+          necessary.addAll(EvalTreeUtil.findDistinctRefColumns(joinNode.getJoinQual()));
+        }
+
+        stack.add(joinNode);
+        outer = pushProjectionRecursive(ctx, optContext,
+            joinNode.getOuterNode(), stack, necessary);
+        inner = pushProjectionRecursive(ctx, optContext,
+            joinNode.getInnerNode(), stack, necessary);
+        stack.pop();
+        Schema merged = SchemaUtil
+            .merge(outer.getOutSchema(), inner.getOutSchema());
+        joinNode.setInSchema(merged);
+
+        TargetListManager targets = optContext.getTargetListManager();
+        List<Target> joinPushable = Lists.newArrayList();
+        List<Integer> joinPushableId = Lists.newArrayList();
+        EvalNode expr;
+        for (int i = 0; i < targets.size(); i++) {
+          expr = targets.getTarget(i).getEvalTree();
+          if (canBeEvaluated(expr, joinNode)
+              && EvalTreeUtil.findDistinctAggFunction(expr).size() == 0
+              && expr.getType() != EvalNode.Type.FIELD) {
+            targets.setEvaluated(i);
+            joinPushable.add(targets.getTarget(i));
+            joinPushableId.add(i);
+          }
+        }
+        if (joinPushable.size() > 0) {
+          joinNode.setTargetList(targets.targets);
+        }
+
+        Schema outSchema = shrinkOutSchema(joinNode.getInSchema(), targets.getUpdatedSchema().getColumns());
+        for (Integer t : joinPushableId) {
+          outSchema.addColumn(targets.getEvaluatedColumn(t));
+        }
+        outSchema = SchemaUtil.mergeAllWithNoDup(outSchema.getColumns(),
+            SchemaUtil.getProjectedSchema(joinNode.getInSchema(),parentNecessary).getColumns());
+        joinNode.setOutSchema(outSchema);
+        break;
+      }
+
+      case UNION:  // It does not support the projection
+        UnionNode unionNode = (UnionNode) node;
+        stack.add(unionNode);
+
+        ParseTree tree =  ctx.getParseTree();
+        if (tree instanceof CreateTableStmt) {
+          tree = ((CreateTableStmt) tree).getSelectStmt();
+        }
+        QueryBlock block = (QueryBlock) tree;
+
+        OptimizationContext outerCtx = new OptimizationContext(ctx,
+            block.getTargetList());
+        OptimizationContext innerCtx = new OptimizationContext(ctx,
+            block.getTargetList());
+        pushProjectionRecursive(ctx, outerCtx, unionNode.getOuterNode(),
+            stack, necessary);
+        pushProjectionRecursive(ctx, innerCtx, unionNode.getInnerNode(),
+            stack, necessary);
+        stack.pop();
+
+        // if this is the final union, we assume that all targets are evalauted
+        // TODO - is it always correct?
+        if (stack.peek().getType() != ExprType.UNION) {
+          optContext.getTargetListManager().setEvaluatedAll();
+        }
+        break;
+
+      case SCAN: {
+        ScanNode scanNode = (ScanNode) node;
+        TargetListManager targets = optContext.getTargetListManager();
+        List<Integer> scanPushableId = Lists.newArrayList();
+        List<Target> scanPushable = Lists.newArrayList();
+        EvalNode expr;
+        for (int i = 0; i < targets.size(); i++) {
+          expr = targets.getTarget(i).getEvalTree();
+          if (!targets.isEvaluated(i) && canBeEvaluated(expr, scanNode)) {
+            if (expr.getType() == EvalNode.Type.FIELD) {
+              targets.setEvaluated(i);
+            } else if (EvalTreeUtil.findDistinctAggFunction(expr).size() == 0) {
+              targets.setEvaluated(i);
+              scanPushable.add(targets.getTarget(i));
+              scanPushableId.add(i);
+            }
+          }
+        }
+
+        if (scanPushable.size() > 0) {
+          scanNode.setTargets(scanPushable.toArray(new Target[scanPushable.size()]));
+        }
+        Schema outSchema = shrinkOutSchema(scanNode.getInSchema(), targets.getUpdatedSchema().getColumns());
+        for (Integer t : scanPushableId) {
+          outSchema.addColumn(targets.getEvaluatedColumn(t));
+        }
+        outSchema = SchemaUtil.mergeAllWithNoDup(outSchema.getColumns(), SchemaUtil.getProjectedSchema(scanNode.getInSchema(),necessary).getColumns());
+        scanNode.setOutSchema(outSchema);
+
+        break;
+      }
+
+      default:
+    }
+
+    return node;
+  }
+
+  private static Schema shrinkOutSchema(Schema inSchema, Collection<Column> necessary) {
+    Schema projected = new Schema();
+    for(Column col : inSchema.getColumns()) {
+      if(necessary.contains(col)) {
+        projected.addColumn(col);
+      }
+    }
+    return projected;
+  }
+
+  public static class OptimizationContext {
+    PlanningContext context;
+    TargetListManager targetListManager;
+
+    public OptimizationContext(PlanningContext context, Target [] targets) {
+      this.context = context;
+      this.targetListManager = new TargetListManager(context, targets);
+    }
+
+    public TargetListManager getTargetListManager() {
+      return this.targetListManager;
+    }
+  }
+
+  public static class TargetListManager {
+    private PlanningContext context;
+    private boolean [] evaluated;
+    private Target [] targets;
+
+    public TargetListManager(PlanningContext context, Target [] targets) {
+      this.context = context;
+      if (targets == null) {
+        evaluated = new boolean[0];
+      } else {
+        evaluated = new boolean[targets.length];
+      }
+      this.targets = targets;
+    }
+
+    public Target getTarget(int id) {
+      return targets[id];
+    }
+
+    public Target [] getTargets() {
+      return this.targets;
+    }
+
+    public int size() {
+      return targets.length;
+    }
+
+    public void setEvaluated(int id) {
+      evaluated[id] = true;
+    }
+
+    public void setEvaluatedAll() {
+      for (int i = 0; i < evaluated.length; i++) {
+        evaluated[i] = true;
+      }
+    }
+
+    private boolean isEvaluated(int id) {
+      return evaluated[id];
+    }
+
+    public Target [] getUpdatedTarget() throws CloneNotSupportedException {
+      Target [] updated = new Target[evaluated.length];
+      for (int i = 0; i < evaluated.length; i++) {
+        if (evaluated[i]) {
+          Column col = getEvaluatedColumn(i);
+          updated[i] = new Target(new FieldEval(col), i);
+        } else {
+          updated[i] = (Target) targets[i].clone();
+        }
+      }
+      return updated;
+    }
+
+    public Schema getUpdatedSchema() {
+      Schema schema = new Schema();
+      for (int i = 0; i < evaluated.length; i++) {
+        if (evaluated[i]) {
+          Column col = getEvaluatedColumn(i);
+          schema.addColumn(col);
+        } else {
+          Collection<Column> cols = getColumnRefs(i);
+          for (Column col : cols) {
+            if (!schema.contains(col.getQualifiedName())) {
+              schema.addColumn(col);
+            }
+          }
+        }
+      }
+
+      return schema;
+    }
+
+    public Collection<Column> getColumnRefs(int id) {
+      return EvalTreeUtil.findDistinctRefColumns(targets[id].getEvalTree());
+    }
+
+    public Column getEvaluatedColumn(int id) {
+      Target t = targets[id];
+      String name;
+      if (t.hasAlias()) {
+        name = t.getAlias();
+      } else if (t.getEvalTree().getName().equals("?")) {
+        name = context.getGeneratedColumnName();
+      } else {
+        name = t.getEvalTree().getName();
+      }
+      return new Column(name, t.getEvalTree().getValueType()[0]);
+    }
+
+    public boolean isAllEvaluated() {
+      for (boolean isEval : evaluated) {
+        if (!isEval) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
new file mode 100644
index 0000000..a313abb
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -0,0 +1,687 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.parser.*;
+import org.apache.tajo.engine.parser.QueryBlock.*;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.logical.join.Edge;
+import org.apache.tajo.engine.planner.logical.join.JoinTree;
+import org.apache.tajo.engine.query.exception.InvalidQueryException;
+import org.apache.tajo.engine.query.exception.NotSupportQueryException;
+import org.apache.tajo.engine.utils.SchemaUtil;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+/**
+ * This class creates a logical plan from a parse tree ({@link org.apache.tajo.engine.parser.QueryBlock})
+ * generated by {@link org.apache.tajo.engine.parser.QueryAnalyzer}.
+ *
+ * @see org.apache.tajo.engine.parser.QueryBlock
+ */
+public class LogicalPlanner {
+  private static Log LOG = LogFactory.getLog(LogicalPlanner.class);
+  private final CatalogService catalog;
+
+  public LogicalPlanner(CatalogService catalog) {
+    this.catalog = catalog;
+  }
+
+  /**
+   * This generates a logical plan.
+   *
+   * @param context
+   * @return a initial logical plan
+   */
+  public LogicalNode createPlan(PlanningContext context) {
+    LogicalNode plan;
+
+    try {
+      plan = createPlanInternal(context, context.getParseTree());
+    } catch (CloneNotSupportedException e) {
+      throw new InvalidQueryException(e);
+    }
+
+    LogicalRootNode root = new LogicalRootNode();
+    root.setInSchema(plan.getOutSchema());
+    root.setOutSchema(plan.getOutSchema());
+    root.setSubNode(plan);
+
+    return root;
+  }
+  
+  private LogicalNode createPlanInternal(PlanningContext ctx,
+      ParseTree query) throws CloneNotSupportedException {
+    LogicalNode plan;
+    
+    switch(query.getType()) {
+    case SELECT:
+      LOG.info("Planning select statement");
+      QueryBlock select = (QueryBlock) query;
+      plan = buildSelectPlan(ctx, select);
+      break;
+      
+    case UNION:
+    case EXCEPT:
+    case INTERSECT:
+      SetStmt set = (SetStmt) query;
+      plan = buildSetPlan(ctx, set);
+      break;
+      
+    case CREATE_INDEX:
+      LOG.info("Planning create index statement");
+      CreateIndexStmt createIndex = (CreateIndexStmt) query;
+      plan = buildCreateIndexPlan(createIndex);
+      break;
+
+    case CREATE_TABLE:
+    case CREATE_TABLE_AS:
+      LOG.info("Planning store statement");
+      CreateTableStmt createTable = (CreateTableStmt) query;
+      plan = buildCreateTablePlan(ctx, createTable);
+      break;
+
+    default:
+      throw new NotSupportQueryException(query.toString());
+    }
+    
+    return plan;
+  }
+
+  private LogicalNode buildSetPlan(PlanningContext ctx, SetStmt stmt)
+      throws CloneNotSupportedException {
+    BinaryNode bin;
+    switch (stmt.getType()) {
+    case UNION:
+      bin = new UnionNode();
+      break;
+    case EXCEPT:
+      bin = new ExceptNode();
+      break;
+    case INTERSECT:
+      bin = new IntersectNode();
+      break;
+    default:
+      throw new IllegalStateException("the statement cannot be matched to any set operation type");
+    }
+    
+    bin.setOuter(createPlanInternal(ctx, stmt.getLeftTree()));
+    bin.setInner(createPlanInternal(ctx, stmt.getRightTree()));
+    bin.setInSchema(bin.getOuterNode().getOutSchema());
+    bin.setOutSchema(bin.getOuterNode().getOutSchema());
+    return bin;
+  }
+
+  private LogicalNode buildCreateIndexPlan(CreateIndexStmt stmt) {
+    FromTable table = new FromTable(catalog.getTableDesc(stmt.getTableName()));
+    ScanNode scan = new ScanNode(table);
+    scan.setInSchema(table.getSchema());
+    scan.setOutSchema(table.getSchema());
+    IndexWriteNode indexWrite = new IndexWriteNode(stmt);
+    indexWrite.setSubNode(scan);
+    indexWrite.setInSchema(scan.getOutSchema());
+    indexWrite.setOutSchema(scan.getOutSchema());
+    
+    return indexWrite;
+  }
+
+  private static LogicalNode buildCreateTablePlan(final PlanningContext ctx,
+                                                  final CreateTableStmt query)
+      throws CloneNotSupportedException {
+    LogicalNode node;
+
+    if (query.hasQueryBlock()) {
+      LogicalNode selectPlan = buildSelectPlan(ctx, query.getSelectStmt());
+      StoreTableNode storeNode = new StoreTableNode(query.getTableName());
+
+      storeNode.setSubNode(selectPlan);
+
+      if (query.hasDefinition()) {
+        storeNode.setOutSchema(query.getTableDef());
+      } else {
+        // TODO - strip qualified name
+        storeNode.setOutSchema(selectPlan.getOutSchema());
+      }
+      storeNode.setInSchema(selectPlan.getOutSchema());
+
+      if (query.hasStoreType()) {
+        storeNode.setStorageType(query.getStoreType());
+      } else {
+        // default type
+        // TODO - it should be configurable.
+        storeNode.setStorageType(CatalogProtos.StoreType.CSV);
+      }
+      if (query.hasOptions()) {
+        storeNode.setOptions(query.getOptions());
+      }
+
+      node = storeNode;
+    } else {
+      CreateTableNode createTable =
+          new CreateTableNode(query.getTableName(), query.getTableDef());
+
+      if (query.hasStoreType()) {
+        createTable.setStorageType(query.getStoreType());
+      } else {
+        // default type
+        // TODO - it should be configurable.
+        createTable.setStorageType(CatalogProtos.StoreType.CSV);
+      }
+      if (query.hasOptions()) {
+        createTable.setOptions(query.getOptions());
+      }
+
+      if (query.hasPath()) {
+        createTable.setPath(query.getPath());
+      }
+
+      node = createTable;
+    }
+    
+    return node;
+  }
+  
+  /**
+   * ^(SELECT from_clause? where_clause? groupby_clause? selectList)
+   * 
+   * @param query
+   * @return the planed logical plan
+   */
+  private static LogicalNode buildSelectPlan(PlanningContext ctx,
+                                             QueryBlock query)
+      throws CloneNotSupportedException {
+    LogicalNode subroot;
+    EvalNode whereCondition = null;
+    EvalNode [] cnf = null;
+    if(query.hasWhereClause()) {
+      whereCondition = query.getWhereCondition();
+      whereCondition = AlgebraicUtil.simplify(whereCondition);
+      cnf = EvalTreeUtil.getConjNormalForm(whereCondition);
+    }
+
+    if(query.hasFromClause()) {
+      if (query.hasExplicitJoinClause()) {
+        subroot = createExplicitJoinTree(query);
+      } else {
+        subroot = createImplicitJoinTree(query.getFromTables(), cnf);
+      }
+    } else {
+      subroot = new EvalExprNode(query.getTargetList());
+      subroot.setOutSchema(getProjectedSchema(ctx, query.getTargetList()));
+      return subroot;
+    }
+    
+    if(whereCondition != null) {
+      SelectionNode selNode = 
+          new SelectionNode(query.getWhereCondition());
+      selNode.setSubNode(subroot);
+      selNode.setInSchema(subroot.getOutSchema());
+      selNode.setOutSchema(selNode.getInSchema());
+      subroot = selNode;
+    }
+    
+    if(query.hasAggregation()) {
+      if (query.isDistinct()) {
+        throw new InvalidQueryException("Cannot support GROUP BY queries with distinct keyword");
+      }
+
+      GroupbyNode groupbyNode = null;
+      if (query.hasGroupbyClause()) {
+        if (query.getGroupByClause().getGroupSet().get(0).getType() == GroupType.GROUPBY) {          
+          groupbyNode = new GroupbyNode(query.getGroupByClause().getGroupSet().get(0).getColumns());
+          groupbyNode.setTargetList(query.getTargetList());
+          groupbyNode.setSubNode(subroot);
+          groupbyNode.setInSchema(subroot.getOutSchema());
+          Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
+          groupbyNode.setOutSchema(outSchema);
+          subroot = groupbyNode;
+        } else if (query.getGroupByClause().getGroupSet().get(0).getType() == GroupType.CUBE) {
+          LogicalNode union = createGroupByUnionByCube(ctx, query,
+              subroot, query.getGroupByClause());
+          Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
+          union.setOutSchema(outSchema);
+          subroot = union;
+        }
+        if(query.hasHavingCond())
+          groupbyNode.setHavingCondition(query.getHavingCond());
+      } else {
+        // when aggregation functions are used without grouping fields
+        groupbyNode = new GroupbyNode(new Column[] {});
+        groupbyNode.setTargetList(query.getTargetList());
+        groupbyNode.setSubNode(subroot);
+        groupbyNode.setInSchema(subroot.getOutSchema());
+        Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
+        groupbyNode.setOutSchema(outSchema);
+        subroot = groupbyNode;
+      }
+    }
+    
+    if(query.hasOrderByClause()) {
+      SortNode sortNode = new SortNode(query.getSortKeys());
+      sortNode.setSubNode(subroot);
+      sortNode.setInSchema(subroot.getOutSchema());
+      sortNode.setOutSchema(sortNode.getInSchema());
+      subroot = sortNode;
+    }
+
+    ProjectionNode prjNode;
+    if (query.getProjectAll()) {
+      Schema merged = SchemaUtil.merge(query.getFromTables());
+      Target [] allTargets = PlannerUtil.schemaToTargets(merged);
+      prjNode = new ProjectionNode(allTargets);
+      prjNode.setSubNode(subroot);
+      prjNode.setInSchema(merged);
+      prjNode.setOutSchema(merged);
+      subroot = prjNode;
+      query.setTargetList(allTargets);
+    } else {
+      prjNode = new ProjectionNode(query.getTargetList());
+      if (subroot != null) { // false if 'no from' statement
+        prjNode.setSubNode(subroot);
+      }
+      prjNode.setInSchema(subroot.getOutSchema());
+
+      // All aggregate functions are evaluated before the projection.
+      // So, the targets for aggregate functions should be updated.
+      LogicalOptimizer.TargetListManager tlm = new LogicalOptimizer.
+          TargetListManager(ctx, query.getTargetList());
+      for (int i = 0; i < tlm.getTargets().length; i++) {
+        if (EvalTreeUtil.findDistinctAggFunction(tlm.getTarget(i).getEvalTree()).size() > 0) {
+          tlm.setEvaluated(i);
+        }
+      }
+      prjNode.setTargetList(tlm.getUpdatedTarget());
+      Schema projected = getProjectedSchema(ctx, tlm.getUpdatedTarget());
+      prjNode.setOutSchema(projected);
+      subroot = prjNode;
+    }
+
+    GroupbyNode dupRemoval;
+    if (query.isDistinct()) {
+      dupRemoval = new GroupbyNode(subroot.getOutSchema().toArray());
+      dupRemoval.setTargetList(query.getTargetList());
+      dupRemoval.setSubNode(subroot);
+      dupRemoval.setInSchema(subroot.getOutSchema());
+      Schema outSchema = getProjectedSchema(ctx, query.getTargetList());
+      dupRemoval.setOutSchema(outSchema);
+      subroot = dupRemoval;
+    }
+
+    if (query.hasLimitClause()) {
+      LimitNode limitNode = new LimitNode(query.getLimitClause());
+      limitNode.setSubNode(subroot);
+      limitNode.setInSchema(subroot.getOutSchema());
+      limitNode.setOutSchema(limitNode.getInSchema());
+      subroot = limitNode;
+    }
+    
+    return subroot;
+  }
+
+  public static LogicalNode createGroupByUnionByCube(
+      final PlanningContext context,
+      final QueryBlock queryBlock,
+      final LogicalNode subNode,
+      final GroupByClause clause) {
+
+    GroupElement element = clause.getGroupSet().get(0);
+
+    List<Column []> cuboids  = generateCuboids(element.getColumns());
+
+    return createGroupByUnion(context, queryBlock, subNode, cuboids, 0);
+  }
+
+  private static Target [] cloneTargets(Target [] srcs)
+      throws CloneNotSupportedException {
+    Target [] clone = new Target[srcs.length];
+    for (int i = 0; i < srcs.length; i++) {
+      clone[i] = (Target) srcs[i].clone();
+    }
+
+    return clone;
+  }
+
+  private static UnionNode createGroupByUnion(final PlanningContext context,
+                                              final QueryBlock queryBlock,
+                                              final LogicalNode subNode,
+                                              final List<Column []> cuboids,
+                                              final int idx) {
+    UnionNode union;
+    try {
+      if ((cuboids.size() - idx) > 2) {
+        GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
+        Target [] clone = cloneTargets(queryBlock.getTargetList());
+
+        g1.setTargetList(clone);
+        g1.setSubNode((LogicalNode) subNode.clone());
+        g1.setInSchema(g1.getSubNode().getOutSchema());
+        Schema outSchema = getProjectedSchema(context, queryBlock.getTargetList());
+        g1.setOutSchema(outSchema);
+
+        union = new UnionNode(g1, createGroupByUnion(context, queryBlock,
+            subNode, cuboids, idx+1));
+        union.setInSchema(g1.getOutSchema());
+        union.setOutSchema(g1.getOutSchema());
+        return union;
+      } else {
+        GroupbyNode g1 = new GroupbyNode(cuboids.get(idx));
+        Target [] clone = cloneTargets(queryBlock.getTargetList());
+        g1.setTargetList(clone);
+        g1.setSubNode((LogicalNode) subNode.clone());
+        g1.setInSchema(g1.getSubNode().getOutSchema());
+        Schema outSchema = getProjectedSchema(context, queryBlock.getTargetList());
+        g1.setOutSchema(outSchema);
+
+        GroupbyNode g2 = new GroupbyNode(cuboids.get(idx+1));
+        clone = cloneTargets(queryBlock.getTargetList());
+        g2.setTargetList(clone);
+        g2.setSubNode((LogicalNode) subNode.clone());
+        g2.setInSchema(g1.getSubNode().getOutSchema());
+        outSchema = getProjectedSchema(context, queryBlock.getTargetList());
+        g2.setOutSchema(outSchema);
+        union = new UnionNode(g1, g2);
+        union.setInSchema(g1.getOutSchema());
+        union.setOutSchema(g1.getOutSchema());
+        return union;
+      }
+    } catch (CloneNotSupportedException cnse) {
+      LOG.error(cnse);
+      throw new InvalidQueryException(cnse);
+    }
+  }
+  
+  public static final Column [] ALL 
+    = Lists.newArrayList().toArray(new Column[0]);
+  
+  public static List<Column []> generateCuboids(Column [] columns) {
+    int numCuboids = (int) Math.pow(2, columns.length);
+    int maxBits = columns.length;    
+    
+    List<Column []> cube = Lists.newArrayList();
+    List<Column> cuboidCols;
+    
+    cube.add(ALL);
+    for (int cuboidId = 1; cuboidId < numCuboids; cuboidId++) {
+      cuboidCols = Lists.newArrayList();
+      for (int j = 0; j < maxBits; j++) {
+        int bit = 1 << j;
+        if ((cuboidId & bit) == bit) {
+          cuboidCols.add(columns[j]);
+        }
+      }
+      cube.add(cuboidCols.toArray(new Column[cuboidCols.size()]));
+    }
+    return cube;
+  }
+
+  private static LogicalNode createExplicitJoinTree(QueryBlock block) {
+    return createExplicitJoinTree_(block.getJoinClause());
+  }
+  
+  private static LogicalNode createExplicitJoinTree_(JoinClause joinClause) {
+
+    JoinNode join;
+    if (joinClause.hasLeftJoin()) {
+      LogicalNode outer = createExplicitJoinTree_(joinClause.getLeftJoin());
+      join = new JoinNode(joinClause.getJoinType(), outer);
+      join.setInner(new ScanNode(joinClause.getRight()));
+    } else {
+      join = new JoinNode(joinClause.getJoinType(), new ScanNode(joinClause.getLeft()),
+          new ScanNode(joinClause.getRight()));
+    }
+    if (joinClause.hasJoinQual()) {
+      join.setJoinQual(joinClause.getJoinQual());
+    } else if (joinClause.hasJoinColumns()) { 
+      // for using clause of explicit join
+      // TODO - to be implemented. Now, tajo only support 'ON' join clause.
+    }
+    
+    // Determine Join Schemas
+    Schema merged;
+    if (joinClause.isNatural()) {
+      merged = getNaturalJoin(join.getOuterNode(), join.getInnerNode());
+    } else {
+      merged = SchemaUtil.merge(join.getOuterNode().getOutSchema(),
+          join.getInnerNode().getOutSchema());
+    }
+    
+    join.setInSchema(merged);
+    join.setOutSchema(merged);
+    
+    // Determine join quals
+    // if natural join, should have the equi join conditions on common columns
+    if (joinClause.isNatural()) {
+      Schema leftSchema = join.getOuterNode().getOutSchema();
+      Schema rightSchema = join.getInnerNode().getOutSchema();
+      Schema commons = SchemaUtil.getCommons(
+          leftSchema, rightSchema);
+      EvalNode njCond = getNaturalJoinCondition(leftSchema, rightSchema, commons);
+      join.setJoinQual(njCond);
+    } else if (joinClause.hasJoinQual()) { 
+      // otherwise, the given join conditions are set
+      join.setJoinQual(joinClause.getJoinQual());
+    }
+    
+    return join;
+  }
+  
+  private static EvalNode getNaturalJoinCondition(Schema outer, Schema inner, Schema commons) {
+    EvalNode njQual = null;
+    EvalNode equiQual;
+    
+    Column leftJoinKey;
+    Column rightJoinKey;
+    for (Column common : commons.getColumns()) {
+      leftJoinKey = outer.getColumnByName(common.getColumnName());
+      rightJoinKey = inner.getColumnByName(common.getColumnName());
+      equiQual = new BinaryEval(EvalNode.Type.EQUAL,
+          new FieldEval(leftJoinKey), new FieldEval(rightJoinKey));
+      if (njQual == null) {
+        njQual = equiQual;
+      } else {
+        njQual = new BinaryEval(EvalNode.Type.AND,
+            njQual, equiQual);
+      }
+    }
+    
+    return njQual;
+  }
+  
+  private static LogicalNode createImplicitJoinTree(FromTable [] tables,
+                                                    EvalNode [] cnf) {
+    if (cnf == null) {
+      return createCatasianProduct(tables);
+    } else {
+      return createCrossJoinFromJoinCondition(tables, cnf);
+    }
+  }
+
+  private static LogicalNode createCrossJoinFromJoinCondition(
+      FromTable [] tables, EvalNode [] cnf) {
+    Map<String, FromTable> fromTableMap = Maps.newHashMap();
+    for (FromTable f : tables) {
+      // TODO - to consider alias and self-join
+      fromTableMap.put(f.getTableName(), f);
+    }
+
+    JoinTree joinTree = new JoinTree(); // to infer join order
+    for (EvalNode expr : cnf) {
+      if (PlannerUtil.isJoinQual(expr)) {
+        joinTree.addJoin(expr);
+      }
+    }
+
+    List<String> remain = Lists.newArrayList(fromTableMap.keySet());
+    remain.removeAll(joinTree.getTables()); // only remain joins not matched to any join condition
+    List<Edge> joinOrder = null;
+    LogicalNode subroot = null;
+    JoinNode join;
+    Schema joinSchema;
+
+    // if there are at least one join matched to the one of join conditions,
+    // we try to traverse the join tree in the depth-first manner and
+    // determine the initial join order. Here, we do not consider the join cost.
+    // The optimized join order will be considered in the optimizer.
+    if (joinTree.getJoinNum() > 0) {
+      Stack<String> stack = new Stack<String>();
+      Set<String> visited = Sets.newHashSet();
+
+
+      // initially, one table is pushed into the stack
+      String seed = joinTree.getTables().iterator().next();
+      stack.add(seed);
+
+      joinOrder = Lists.newArrayList();
+
+      while (!stack.empty()) {
+        String table = stack.pop();
+        if (visited.contains(table)) {
+          continue;
+        }
+        visited.add(table);
+
+        // 'joinOrder' will contain all tables corresponding to the given join conditions.
+        for (Edge edge : joinTree.getEdges(table)) {
+          if (!visited.contains(edge.getTarget()) && !edge.getTarget().equals(table)) {
+            stack.add(edge.getTarget());
+            joinOrder.add(edge);
+          }
+        }
+      }
+
+      subroot = new ScanNode(fromTableMap.get(joinOrder.get(0).getSrc()));
+      LogicalNode inner;
+      for (Edge edge : joinOrder) {
+        inner = new ScanNode(fromTableMap.get(edge.getTarget()));
+        join = new JoinNode(JoinType.CROSS_JOIN, subroot, inner);
+        subroot = join;
+
+        joinSchema = SchemaUtil.merge(
+            join.getOuterNode().getOutSchema(),
+            join.getInnerNode().getOutSchema());
+        join.setInSchema(joinSchema);
+        join.setOutSchema(joinSchema);
+      }
+    }
+
+    // Here, there are two cases:
+    // 1) there already exists the join plan.
+    // 2) there are no join plan.
+    if (joinOrder != null) { // case 1)
+      // if there are join tables corresponding to any join condition,
+      // the join plan is placed as the outer plan of the product.
+      remain.remove(joinOrder.get(0).getSrc());
+      remain.remove(joinOrder.get(0).getTarget());
+    } else { // case 2)
+      // if there are no inferred joins, the one of the remain join tables is placed as the left table
+      subroot = new ScanNode(fromTableMap.get(remain.get(0)));
+      remain.remove(remain.get(0));
+    }
+
+    // Here, the variable 'remain' contains join tables which are not matched to any join conditions.
+    // Thus, they will be joined by catasian product
+    for (String table : remain) {
+      join = new JoinNode(JoinType.CROSS_JOIN,
+          subroot, new ScanNode(fromTableMap.get(table)));
+      joinSchema = SchemaUtil.merge(
+          join.getOuterNode().getOutSchema(),
+          join.getInnerNode().getOutSchema());
+      join.setInSchema(joinSchema);
+      join.setOutSchema(joinSchema);
+      subroot = join;
+    }
+
+    return subroot;
+  }
+
+  // TODO - this method is somewhat duplicated to createCrossJoinFromJoinCondition. Later, it should be removed.
+  private static LogicalNode createCatasianProduct(FromTable [] tables) {
+    LogicalNode subroot = new ScanNode(tables[0]);
+    Schema joinSchema;
+    if(tables.length > 1) {
+      for(int i=1; i < tables.length; i++) {
+        JoinNode join = new JoinNode(JoinType.CROSS_JOIN,
+            subroot, new ScanNode(tables[i]));
+        joinSchema = SchemaUtil.merge(
+            join.getOuterNode().getOutSchema(),
+            join.getInnerNode().getOutSchema());
+        join.setInSchema(joinSchema);
+        join.setOutSchema(joinSchema);
+        subroot = join;
+      }
+    }
+
+    return subroot;
+  }
+
+  public static Schema getProjectedSchema(PlanningContext context, Target [] targets) {
+    Schema projected = new Schema();
+    for(Target t : targets) {
+      DataType type = t.getEvalTree().getValueType()[0];
+      String name;
+      if (t.hasAlias()) {
+        name = t.getAlias();
+      } else if (t.getEvalTree().getName().equals("?")) {
+        name = context.getGeneratedColumnName();
+      } else {
+        name = t.getEvalTree().getName();
+      }
+      projected.addColumn(name,type);
+    }
+
+    return projected;
+  }
+  
+  private static Schema getNaturalJoin(LogicalNode outer, LogicalNode inner) {
+    Schema joinSchema = new Schema();
+    Schema commons = SchemaUtil.getCommons(outer.getOutSchema(),
+        inner.getOutSchema());
+    joinSchema.addColumns(commons);
+    for (Column c : outer.getOutSchema().getColumns()) {
+      for (Column common : commons.getColumns()) {
+        if (!common.getColumnName().equals(c.getColumnName())) {
+          joinSchema.addColumn(c);
+        }
+      }
+    }
+
+    for (Column c : inner.getOutSchema().getColumns()) {
+      for (Column common : commons.getColumns()) {
+        if (!common.getColumnName().equals(c.getColumnName())) {
+          joinSchema.addColumn(c);
+        }
+      }
+    }
+    return joinSchema;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
new file mode 100644
index 0000000..e8bdbc5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlanner.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner;
+
+import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.exception.InternalException;
+
+/**
+ * This class generates a physical execution plan.
+ */
+public interface PhysicalPlanner {
+  public PhysicalExec createPlan(TaskAttemptContext context,
+                                 LogicalNode logicalPlan)
+      throws InternalException;
+}


Mime
View raw message