tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [30/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik)
Date Fri, 18 Apr 2014 10:31:42 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
new file mode 100644
index 0000000..8c55d7f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * It stores a sorted data set into a number of partition files. It assumes that input tuples are sorted in an
+ * ascending or descending order of partition columns.
+ */
+public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
+  private static Log LOG = LogFactory.getLog(SortBasedColPartitionStoreExec.class);
+
+  private Tuple currentKey;
+  private Tuple prevKey;
+
+  private Appender appender;
+  private TableStats aggregated;
+
+  public SortBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
+      throws IOException {
+    super(context, plan, child);
+  }
+
+  public void init() throws IOException {
+    super.init();
+
+    currentKey = new VTuple(keyNum);
+    aggregated = new TableStats();
+  }
+
+  private Appender getAppender(String partition) throws IOException {
+    Path dataFile = getDataFile(partition);
+    FileSystem fs = dataFile.getFileSystem(context.getConf());
+
+    if (fs.exists(dataFile.getParent())) {
+      LOG.info("Path " + dataFile.getParent() + " already exists!");
+    } else {
+      fs.mkdirs(dataFile.getParent());
+      LOG.info("Add subpartition path directory :" + dataFile.getParent());
+    }
+
+    if (fs.exists(dataFile)) {
+      LOG.info("File " + dataFile + " already exists!");
+      FileStatus status = fs.getFileStatus(dataFile);
+      LOG.info("File size: " + status.getLen());
+    }
+
+    appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
+    appender.enableStats();
+    appender.init();
+
+    return appender;
+  }
+
+  private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) {
+    for (int i = 0; i < keyIds.length; i++) {
+      keyTuple.put(i, inTuple.get(keyIds[i]));
+    }
+  }
+
+  private String getSubdirectory(Tuple keyTuple) {
+    StringBuilder sb = new StringBuilder();
+
+    for(int i = 0; i < keyIds.length; i++) {
+      Datum datum = keyTuple.get(i);
+      if(i > 0) {
+        sb.append("/");
+      }
+      sb.append(keyNames[i]).append("=");
+      sb.append(datum.asChars());
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    while((tuple = child.next()) != null) {
+
+      fillKeyTuple(tuple, currentKey);
+
+      if (prevKey == null) {
+        appender = getAppender(getSubdirectory(currentKey));
+        prevKey = new VTuple(currentKey);
+      } else {
+        if (!prevKey.equals(currentKey)) {
+          appender.close();
+          StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
+
+          appender = getAppender(getSubdirectory(currentKey));
+          prevKey = new VTuple(currentKey);
+        }
+      }
+
+      appender.addTuple(tuple);
+    }
+
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (appender != null) {
+      appender.close();
+      StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
+      context.setResultStats(aggregated);
+    }
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    // nothing to do
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
new file mode 100644
index 0000000..a4a8d37
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortExec.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.TupleComparator;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+public abstract class SortExec extends UnaryPhysicalExec {
+  private final Comparator<Tuple> comparator;
+  private final SortSpec [] sortSpecs;
+
+  public SortExec(TaskAttemptContext context, Schema inSchema,
+                  Schema outSchema, PhysicalExec child, SortSpec [] sortSpecs) {
+    super(context, inSchema, outSchema, child);
+    this.sortSpecs = sortSpecs;
+    this.comparator = new TupleComparator(inSchema, sortSpecs);
+  }
+
+  public SortSpec[] getSortSpecs() {
+    return sortSpecs;
+  }
+
+  public Comparator<Tuple> getComparator() {
+    return comparator;
+  }
+
+  @Override
+  abstract public Tuple next() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
new file mode 100644
index 0000000..1f927a6
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.planner.logical.InsertNode;
+import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * This is a physical executor to store a table part into a specified storage.
+ */
+public class StoreTableExec extends UnaryPhysicalExec {
+  private PersistentStoreNode plan;
+  private Appender appender;
+  private Tuple tuple;
+
+  public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    this.plan = plan;
+  }
+
+  public void init() throws IOException {
+    super.init();
+
+    TableMeta meta;
+    if (plan.hasOptions()) {
+      meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+    } else {
+      meta = CatalogUtil.newTableMeta(plan.getStorageType());
+    }
+
+    if (plan instanceof InsertNode) {
+      InsertNode createTableNode = (InsertNode) plan;
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+          createTableNode.getTableSchema(), context.getOutputPath());
+    } else {
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema,
+          context.getOutputPath());
+    }
+
+    appender.enableStats();
+    appender.init();
+  }
+
+  /* (non-Javadoc)
+   * @see PhysicalExec#next()
+   */
+  @Override
+  public Tuple next() throws IOException {
+    while((tuple = child.next()) != null) {
+      appender.addTuple(tuple);
+    }
+        
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    // nothing to do
+  }
+
+  public void close() throws IOException {
+    super.close();
+
+    if(appender != null){
+      appender.flush();
+      appender.close();
+      // Collect statistics data
+      context.setResultStats(appender.getStats());
+      context.addShuffleFileOutput(0, context.getTaskId().toString());
+    }
+
+    appender = null;
+    plan = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TunnelExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TunnelExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TunnelExec.java
new file mode 100644
index 0000000..fffcc39
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/TunnelExec.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class TunnelExec extends UnaryPhysicalExec {
+
+  public TunnelExec (final TaskAttemptContext context,
+                     final Schema outputSchema,
+                     final PhysicalExec child) {
+    super(context, outputSchema, outputSchema, child);
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    return child.next();
+  }
+  @Override
+  public void rescan() throws IOException {   
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
new file mode 100644
index 0000000..ab67d7b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+public abstract class UnaryPhysicalExec extends PhysicalExec {
+  protected PhysicalExec child;
+  protected float progress;
+  protected TableStats inputStats;
+
+  public UnaryPhysicalExec(TaskAttemptContext context,
+                           Schema inSchema, Schema outSchema,
+                           PhysicalExec child) {
+    super(context, inSchema, outSchema);
+    this.child = child;
+  }
+
+  public <T extends PhysicalExec> T getChild() {
+    return (T) this.child;
+  }
+
+  @VisibleForTesting
+  public void setChild(PhysicalExec child){
+    this.child = child;
+  }
+
+  public void init() throws IOException {
+    progress = 0.0f;
+    if (child != null) {
+      child.init();
+    }
+  }
+
+  public void rescan() throws IOException {
+    progress = 0.0f;
+    if (child != null) {
+      child.rescan();
+    }
+  }
+
+  public void close() throws IOException {
+    progress = 1.0f;
+    if (child != null) {
+      child.close();
+      try {
+        TableStats stat = child.getInputStats();
+        if (stat != null) {
+          inputStats = (TableStats)(stat.clone());
+        }
+      } catch (CloneNotSupportedException e) {
+        e.printStackTrace();
+      }
+      child = null;
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    if (child != null) {
+      return child.getProgress();
+    } else {
+      return progress;
+    }
+  }
+
+  @Override
+  public TableStats getInputStats() {
+    if (child != null) {
+      return child.getInputStats();
+    } else {
+      return inputStats;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
new file mode 100644
index 0000000..497c6d3
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnionExec.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+public class UnionExec extends BinaryPhysicalExec {
+  private boolean nextOuter = true;
+  private Tuple tuple;
+
+  public UnionExec(TaskAttemptContext context, PhysicalExec outer, PhysicalExec inner) {
+    super(context, outer.getSchema(), inner.getSchema(), outer, inner);
+    if (!outer.getSchema().equals(inner.getSchema())) {
+      throw new InvalidQueryException(
+          "The both schemas are not same");
+    }
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    if (nextOuter) {
+      tuple = leftChild.next();
+      if (tuple == null) {
+       nextOuter = false; 
+      } else {
+        return tuple;
+      }
+    }
+    
+    return rightChild.next();
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    super.rescan();
+
+    nextOuter = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java
new file mode 100644
index 0000000..3b4b712
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/BasicQueryRewriteEngine.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlanningException;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is a basic query rewrite rule engine. This rewrite rule engine
+ * rewrites a logical plan with various query rewrite rules.
+ */
+public class BasicQueryRewriteEngine implements QueryRewriteEngine {
+  /** class logger */
+  private Log LOG = LogFactory.getLog(BasicQueryRewriteEngine.class);
+
+  /** a map for query rewrite rules  */
+  private Map<String, RewriteRule> rewriteRules = new LinkedHashMap<String, RewriteRule>();
+
+  /**
+   * Add a query rewrite rule to this engine.
+   *
+   * @param rule The rule to be added to this engine.
+   */
+  public void addRewriteRule(RewriteRule rule) {
+    if (!rewriteRules.containsKey(rule.getName())) {
+      rewriteRules.put(rule.getName(), rule);
+    }
+  }
+
+  /**
+   * Rewrite a logical plan with all query rewrite rules added to this engine.
+   *
+   * @param plan The plan to be rewritten with all query rewrite rule.
+   * @return The rewritten plan.
+   */
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    RewriteRule rule;
+    for (Entry<String, RewriteRule> rewriteRule : rewriteRules.entrySet()) {
+      rule = rewriteRule.getValue();
+      if (rule.isEligible(plan)) {
+        plan = rule.rewrite(plan);
+        LOG.info("The rule \"" + rule.getName() + " \" rewrites the query.");
+      }
+    }
+
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
new file mode 100644
index 0000000..63b426f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, LogicalNode> implements RewriteRule {
+  private static final String NAME = "FilterPushDown";
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      this.visit(new HashSet<EvalNode>(), plan, block, block.getRoot(), new Stack<LogicalNode>());
+    }
+
+    return plan;
+  }
+
+  @Override
+  public LogicalNode visitFilter(Set<EvalNode> cnf, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 SelectionNode selNode, Stack<LogicalNode> stack) throws PlanningException {
+    cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
+
+    stack.push(selNode);
+    visit(cnf, plan, block, selNode.getChild(), stack);
+    stack.pop();
+
+    if(cnf.size() == 0) { // remove the selection operator if there is no search condition after selection push.
+      LogicalNode node = stack.peek();
+      if (node instanceof UnaryNode) {
+        UnaryNode unary = (UnaryNode) node;
+        unary.setChild(selNode.getChild());
+      } else {
+        throw new InvalidQueryException("Unexpected Logical Query Plan");
+      }
+    } else { // if there remain search conditions
+
+      // check if it can be evaluated here
+      Set<EvalNode> matched = TUtil.newHashSet();
+      for (EvalNode eachEval : cnf) {
+        if (LogicalPlanner.checkIfBeEvaluatedAtThis(eachEval, selNode)) {
+          matched.add(eachEval);
+        }
+      }
+
+      // if there are search conditions which can be evaluated here, push down them and remove them from cnf.
+      if (matched.size() > 0) {
+        selNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(matched.toArray(new EvalNode[matched.size()])));
+        cnf.removeAll(matched);
+      }
+    }
+
+    return selNode;
+  }
+
+  private boolean isOuterJoin(JoinType joinType) {
+    return joinType == JoinType.LEFT_OUTER || joinType == JoinType.RIGHT_OUTER || joinType==JoinType.FULL_OUTER;
+  }
+
+  @Override
+  public LogicalNode visitJoin(Set<EvalNode> cnf, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode joinNode,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    LogicalNode left = joinNode.getRightChild();
+    LogicalNode right = joinNode.getLeftChild();
+
+    // here we should stop selection pushdown on the null supplying side(s) of an outer join
+    // get the two operands of the join operation as well as the join type
+    JoinType joinType = joinNode.getJoinType();
+    EvalNode joinQual = joinNode.getJoinQual();
+    if (joinQual != null && isOuterJoin(joinType)) {
+
+      // if both are fields
+      if (joinQual.getLeftExpr().getType() == EvalType.FIELD && joinQual.getRightExpr().getType() == EvalType.FIELD) {
+
+        String leftTableName = ((FieldEval) joinQual.getLeftExpr()).getQualifier();
+        String rightTableName = ((FieldEval) joinQual.getRightExpr()).getQualifier();
+        List<String> nullSuppliers = Lists.newArrayList();
+        Set<String> leftTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
+            joinNode.getLeftChild()));
+        Set<String> rightTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
+            joinNode.getRightChild()));
+
+        // some verification
+        if (joinType == JoinType.FULL_OUTER) {
+          nullSuppliers.add(leftTableName);
+          nullSuppliers.add(rightTableName);
+
+          // verify that these null suppliers are indeed in the left and right sets
+          if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+          if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+
+        } else if (joinType == JoinType.LEFT_OUTER) {
+          nullSuppliers.add(((RelationNode)joinNode.getRightChild()).getCanonicalName());
+          //verify that this null supplier is indeed in the right sub-tree
+          if (!rightTableSet.contains(nullSuppliers.get(0))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+        } else if (joinType == JoinType.RIGHT_OUTER) {
+          if (((RelationNode)joinNode.getRightChild()).getCanonicalName().equals(rightTableName)) {
+            nullSuppliers.add(leftTableName);
+          } else {
+            nullSuppliers.add(rightTableName);
+          }
+
+          // verify that this null supplier is indeed in the left sub-tree
+          if (!leftTableSet.contains(nullSuppliers.get(0))) {
+            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
+          }
+        }
+
+        // retain in this outer join node's JoinQual those selection predicates
+        // related to the outer join's null supplier(s)
+        List<EvalNode> matched2 = Lists.newArrayList();
+        for (EvalNode eval : cnf) {
+
+          Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(eval);
+          Set<String> tableNames = Sets.newHashSet();
+          // getting distinct table references
+          for (Column col : columnRefs) {
+            if (!tableNames.contains(col.getQualifier())) {
+              tableNames.add(col.getQualifier());
+            }
+          }
+
+          //if the predicate involves any of the null suppliers
+          boolean shouldKeep=false;
+          Iterator<String> it2 = nullSuppliers.iterator();
+          while(it2.hasNext()){
+            if(tableNames.contains(it2.next()) == true) {
+              shouldKeep = true;
+            }
+          }
+
+          if(shouldKeep == true) {
+            matched2.add(eval);
+          }
+
+        }
+
+        //merge the retained predicates and establish them in the current outer join node. Then remove them from the cnf
+        EvalNode qual2 = null;
+        if (matched2.size() > 1) {
+          // merged into one eval tree
+          qual2 = AlgebraicUtil.createSingletonExprFromCNF(
+              matched2.toArray(new EvalNode[matched2.size()]));
+        } else if (matched2.size() == 1) {
+          // if the number of matched expr is one
+          qual2 = matched2.get(0);
+        }
+
+        if (qual2 != null) {
+          EvalNode conjQual2 = AlgebraicUtil.createSingletonExprFromCNF(joinNode.getJoinQual(), qual2);
+          joinNode.setJoinQual(conjQual2);
+          cnf.removeAll(matched2);
+        } // for the remaining cnf, push it as usual
+      }
+    }
+
+    if (joinNode.hasJoinQual()) {
+      cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
+    }
+
+    visit(cnf, plan, block, left, stack);
+    visit(cnf, plan, block, right, stack);
+
+    List<EvalNode> matched = Lists.newArrayList();
+    for (EvalNode eval : cnf) {
+      if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, eval, joinNode, stack.peek().getType() != NodeType.JOIN)) {
+        matched.add(eval);
+      }
+    }
+
+    EvalNode qual = null;
+    if (matched.size() > 1) {
+      // merged into one eval tree
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
+          matched.toArray(new EvalNode[matched.size()]));
+    } else if (matched.size() == 1) {
+      // if the number of matched expr is one
+      qual = matched.get(0);
+    }
+
+    if (qual != null) {
+      joinNode.setJoinQual(qual);
+
+      if (joinNode.getJoinType() == JoinType.CROSS) {
+        joinNode.setJoinType(JoinType.INNER);
+      }
+      cnf.removeAll(matched);
+    }
+
+    return joinNode;
+  }
+
+  @Override
+  public LogicalNode visitTableSubQuery(Set<EvalNode> cnf, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                        TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
+    List<EvalNode> matched = Lists.newArrayList();
+    for (EvalNode eval : cnf) {
+      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, node)) {
+        matched.add(eval);
+      }
+    }
+
+    Map<String, String> columnMap = new HashMap<String, String>();
+    for (int i = 0; i < node.getInSchema().size(); i++) {
+      LogicalNode childNode = node.getSubQuery();
+      if (childNode.getOutSchema().getColumn(i).hasQualifier()) {
+      columnMap.put(node.getInSchema().getColumn(i).getQualifiedName(),
+          childNode.getOutSchema().getColumn(i).getQualifiedName());
+      } else {
+        NamedExprsManager namedExprsMgr = plan.getBlock(node.getSubQuery()).getNamedExprsManager();
+        columnMap.put(node.getInSchema().getColumn(i).getQualifiedName(),
+          namedExprsMgr.getOriginalName(childNode.getOutSchema().getColumn(i).getQualifiedName()));
+      }
+    }
+
+    Set<EvalNode> transformed = new HashSet<EvalNode>();
+
+    // Rename from upper block's one to lower block's one
+    for (EvalNode matchedEval : matched) {
+      EvalNode copy;
+      try {
+        copy = (EvalNode) matchedEval.clone();
+      } catch (CloneNotSupportedException e) {
+        throw new PlanningException(e);
+      }
+
+      Set<Column> columns = EvalTreeUtil.findUniqueColumns(copy);
+      for (Column c : columns) {
+        if (columnMap.containsKey(c.getQualifiedName())) {
+          EvalTreeUtil.changeColumnRef(copy, c.getQualifiedName(), columnMap.get(c.getQualifiedName()));
+        } else {
+          throw new PlanningException(
+              "Invalid Filter PushDown on SubQuery: No such a corresponding column '"
+                  + c.getQualifiedName());
+        }
+      }
+
+      transformed.add(copy);
+    }
+
+    visit(transformed, plan, plan.getBlock(node.getSubQuery()));
+
+    cnf.removeAll(matched);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitScan(Set<EvalNode> cnf, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode,
+                               Stack<LogicalNode> stack) throws PlanningException {
+    List<EvalNode> matched = Lists.newArrayList();
+    for (EvalNode eval : cnf) {
+      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {
+        matched.add(eval);
+      }
+    }
+
+    EvalNode qual = null;
+    if (matched.size() > 1) {
+      // merged into one eval tree
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
+          matched.toArray(new EvalNode[matched.size()]));
+    } else if (matched.size() == 1) {
+      // if the number of matched expr is one
+      qual = matched.get(0);
+    }
+
+    if (qual != null) { // if a matched qual exists
+      scanNode.setQual(qual);
+    }
+
+    cnf.removeAll(matched);
+
+    return scanNode;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
new file mode 100644
index 0000000..6e78c18
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class PartitionedTableRewriter implements RewriteRule {
+  private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
+
+  private static final String NAME = "Partitioned Table Rewriter";
+  private final Rewriter rewriter = new Rewriter();
+
+  private final TajoConf systemConf;
+
+  public PartitionedTableRewriter(TajoConf conf) {
+    systemConf = conf;
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      for (RelationNode relation : block.getRelations()) {
+        if (relation.getType() == NodeType.SCAN) {
+          TableDesc table = ((ScanNode)relation).getTableDesc();
+          if (table.hasPartition()) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    boolean containsPartitionedTables;
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      containsPartitionedTables = false;
+      for (RelationNode relation : block.getRelations()) {
+        if (relation.getType() == NodeType.SCAN) {
+          TableDesc table = ((ScanNode)relation).getTableDesc();
+          if (table.hasPartition()) {
+            containsPartitionedTables = true;
+          }
+        }
+      }
+      if (containsPartitionedTables) {
+        rewriter.visit(block, plan, block, block.getRoot(), new Stack<LogicalNode>());
+      }
+    }
+    return plan;
+  }
+
+  private static class PartitionPathFilter implements PathFilter {
+    private Schema schema;
+    private EvalNode partitionFilter;
+
+
+    public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
+      this.schema = schema;
+      this.partitionFilter = partitionFilter;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+      if (tuple == null) { // if it is a file or not acceptable file
+        return false;
+      }
+
+      return partitionFilter.eval(schema, tuple).asBool();
+    }
+
+    @Override
+    public String toString() {
+      return partitionFilter.toString();
+    }
+  }
+
+  /**
+   * It assumes that each conjunctive form corresponds to one column.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms search condition corresponding to partition columns.
+   *                         If it is NULL, it means that there is no search condition for this table.
+   * @param tablePath
+   * @return
+   * @throws IOException
+   */
+  private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
+      throws IOException {
+
+    FileSystem fs = tablePath.getFileSystem(systemConf);
+
+    PathFilter [] filters;
+    if (conjunctiveForms == null) {
+      filters = buildAllAcceptingPathFilters(partitionColumns);
+    } else {
+      filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
+    }
+
+    // loop from one to the number of partition columns
+    Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
+
+    for (int i = 1; i < partitionColumns.size(); i++) {
+      // Get all file status matched to a ith level path filter.
+      filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
+    }
+
+    LOG.info("Filtered directory or files: " + filteredPaths.length);
+    return filteredPaths;
+  }
+
+  /**
+   * Build path filters for all levels with a list of filter conditions.
+   *
+   * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
+   * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
+   *
+   * Corresponding filter conditions will be placed on each path filter,
+   * If there is no corresponding expression for certain column,
+   * The condition will be filled with a true value.
+   *
+   * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
+   * There is no filter condition corresponding to col2.
+   * Then, the path filter conditions are corresponding to the followings:
+   *
+   * The first path filter: col1 = 'A'
+   * The second path filter: col1 = 'A' AND col2 IS NOT NULL
+   * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
+   *
+   * 'IS NOT NULL' predicate is always true against the partition path.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms
+   * @return
+   */
+  private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
+                                                     EvalNode [] conjunctiveForms) {
+    // Building partition path filters for all levels
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.size()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+
+      for (EvalNode expr : conjunctiveForms) {
+        if (EvalTreeUtil.findUniqueColumns(expr).contains(target)) {
+          // Accumulate one qual per level
+          accumulatedFilters.add(expr);
+        }
+      }
+
+      if (accumulatedFilters.size() < (i + 1)) {
+        accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+      }
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  /**
+   * Build an array of path filters for all levels with all accepting filter condition.
+   * @param partitionColumns The partition columns schema
+   * @return The array of path filter, accpeting all partition paths.
+   */
+  private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.size()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.size(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+      accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  private static Path [] toPathArray(FileStatus[] fileStatuses) {
+    Path [] paths = new Path[fileStatuses.length];
+    for (int j = 0; j < fileStatuses.length; j++) {
+      paths[j] = fileStatuses[j].getPath();
+    }
+    return paths;
+  }
+
+  private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
+    TableDesc table = scanNode.getTableDesc();
+    PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
+
+    Schema paritionValuesSchema = new Schema();
+    for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
+      paritionValuesSchema.addColumn(column);
+    }
+
+    Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+
+    // if a query statement has a search condition, try to find indexable predicates
+    if (scanNode.hasQual()) {
+      EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
+      Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
+
+      // add qualifier to schema for qual
+      paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
+      for (Column column : paritionValuesSchema.getColumns()) {
+        for (EvalNode simpleExpr : conjunctiveForms) {
+          if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
+            indexablePredicateSet.add(simpleExpr);
+          }
+        }
+      }
+
+      // Partitions which are not matched to the partition filter conditions are pruned immediately.
+      // So, the partition filter conditions are not necessary later, and they are removed from
+      // original search condition for simplicity and efficiency.
+      remainExprs.removeAll(indexablePredicateSet);
+      if (remainExprs.isEmpty()) {
+        scanNode.setQual(null);
+      } else {
+        scanNode.setQual(
+            AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
+      }
+    }
+
+    if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
+      return findFilteredPaths(paritionValuesSchema,
+          indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
+    } else { // otherwise, we will get all partition paths.
+      return findFilteredPaths(paritionValuesSchema, null, table.getPath());
+    }
+  }
+
+  private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+    if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
+      Set<Column> variables = EvalTreeUtil.findUniqueColumns(evalNode);
+      // if it contains only single variable matched to a target column
+      return variables.size() == 1 && variables.contains(targetColumn);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check if an expression consists of one variable and one constant and
+   * the expression is a comparison operator.
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an expression consists of one variable and one constant
+   * and the expression is a comparison operator. Other, false.
+   */
+  private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+    // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
+    return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
+  }
+
+  /**
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an disjunctive expression, consisting of indexable expressions
+   */
+  private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
+    if (evalNode.getType() == EvalType.OR) {
+      boolean indexable =
+          checkIfIndexablePredicate(evalNode.getLeftExpr()) &&
+              checkIfIndexablePredicate(evalNode.getRightExpr());
+
+      boolean sameVariable =
+          EvalTreeUtil.findUniqueColumns(evalNode.getLeftExpr())
+          .equals(EvalTreeUtil.findUniqueColumns(evalNode.getRightExpr()));
+
+      return indexable && sameVariable;
+    } else {
+      return false;
+    }
+  }
+
+  private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
+    if (scanNode.getInputPaths().length > 0) {
+      try {
+        FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
+        long totalVolume = 0;
+
+        for (Path input : scanNode.getInputPaths()) {
+          ContentSummary summary = fs.getContentSummary(input);
+          totalVolume += summary.getLength();
+          totalVolume += summary.getFileCount();
+        }
+        scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
+      } catch (IOException e) {
+        throw new PlanningException(e);
+      }
+    }
+  }
+
+  private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> {
+    @Override
+    public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode,
+                            Stack<LogicalNode> stack) throws PlanningException {
+
+      TableDesc table = scanNode.getTableDesc();
+      if (!table.hasPartition()) {
+        return null;
+      }
+
+      try {
+        Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
+        plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
+        PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class);
+        rewrittenScanNode.init(scanNode, filteredPaths);
+        updateTableStat(rewrittenScanNode);
+
+        // if it is topmost node, set it as the rootnode of this block.
+        if (stack.empty()) {
+          block.setRoot(rewrittenScanNode);
+        } else {
+          PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
+        }
+      } catch (IOException e) {
+        throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/026368be/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
new file mode 100644
index 0000000..668ed68
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -0,0 +1,966 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.collect.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.SchemaUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+/**
+ * ProjectionPushDownRule deploys expressions in a selection list to proper
+ * {@link org.apache.tajo.engine.planner.logical.Projectable}
+ * nodes. In this process, the expressions are usually pushed down into as lower as possible.
+ * It also enables scanners to read only necessary columns.
+ */
+public class ProjectionPushDownRule extends
+    BasicLogicalPlanVisitor<ProjectionPushDownRule.Context, LogicalNode> implements RewriteRule {
+  /** Class Logger */
+  private final Log LOG = LogFactory.getLog(ProjectionPushDownRule.class);
+  private static final String name = "ProjectionPushDown";
+
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
+
+    if (PlannerUtil.checkIfDDLPlan(toBeOptimized) || !plan.getRootBlock().hasTableExpression()) {
+      LOG.info("This query skips the logical optimization step.");
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
+
+    LogicalPlan.QueryBlock topmostBlock = rootBlock;
+
+    Stack<LogicalNode> stack = new Stack<LogicalNode>();
+    Context context = new Context(plan);
+    visit(context, plan, topmostBlock, topmostBlock.getRoot(), stack);
+
+    return plan;
+  }
+
+  /**
+   * <h2>What is TargetListManager?</h2>
+   * It manages all expressions used in a query block, and their reference names.
+   * TargetListManager provides a way to find an expression by a reference name.
+   * It keeps a set of expressions, and one or more reference names can point to
+   * the same expression.
+   *
+   * Also, TargetListManager keeps the evaluation state of each expression.
+   * The evaluation state is a boolean state to indicate whether the expression
+   * was evaluated in descendant node or not. If an expression is evaluated,
+   * the evaluation state is changed to TRUE. It also means that
+   * the expression can be referred by an column reference instead of evaluating the expression.
+   *
+   * Consider an example query:
+   *
+   * SELECT sum(l_orderkey + 1) from lineitem where l_partkey > 1;
+   *
+   * In this case, an expression sum(l_orderkey + 1) is divided into two sub expressions:
+   * <ul>
+   *  <li>$1 <- l_orderkey + 1</li>
+   *  <li>$2 <- sum($1)</li>
+   * </ul>
+   *
+   * <code>$1</code> is a simple arithmetic operation, and $2 is an aggregation function.
+   * <code>$1</code> is evaluated in ScanNode because it's just a simple arithmetic operation.
+   * So, the evaluation state of l_orderkey + 1 initially
+   * is false, but the state will be true after ScanNode.
+   *
+   * In contrast, sum($1) is evaluated at GroupbyNode. So, its evaluation state is changed
+   * after GroupByNode.
+   *
+   * <h2>Why is TargetListManager necessary?</h2>
+   *
+   * Expressions used in a query block can be divided into various categories according to
+   * the possible {@link Projectable} nodes. Their references become available depending on
+   * the Projectable node at which expressions are evaluated. It manages the expressions and
+   * references for optimized places of expressions. It performs duplicated removal and enables
+   * common expressions to be shared with two or more Projectable nodes. It also helps Projectable
+   * nodes to find correct column references.
+   */
+  public static class TargetListManager {
+    private Integer seqId = 0;
+
+    /**
+     * Why should we use LinkedHashMap for those maps ?
+     *
+     * These maps are mainly by the target list of each projectable node
+     * (i.e., ProjectionNode, GroupbyNode, JoinNode, and ScanNode).
+     * The projection node removal occurs only when the projection node's output
+     * schema and its child's output schema are equivalent to each other.
+     *
+     * If we keep the inserted order of all expressions. It would make the possibility
+     * of projection node removal higher.
+     **/
+
+    /** A Map: Name -> Id */
+    private LinkedHashMap<String, Integer> nameToIdBiMap;
+    /** Map: Id <-> EvalNode */
+    private BiMap<Integer, EvalNode> idToEvalBiMap;
+    /** Map: Id -> Names */
+    private LinkedHashMap<Integer, List<String>> idToNamesMap;
+    /** Map: Name -> Boolean */
+    private LinkedHashMap<String, Boolean> evaluationStateMap;
+
+    private LogicalPlan plan;
+
+    public TargetListManager(LogicalPlan plan) {
+      this.plan = plan;
+      nameToIdBiMap = Maps.newLinkedHashMap();
+      idToEvalBiMap = HashBiMap.create();
+      idToNamesMap = Maps.newLinkedHashMap();
+      evaluationStateMap = Maps.newLinkedHashMap();
+    }
+
+    private int getNextSeqId() {
+      return seqId++;
+    }
+
+    /**
+     * Add an expression with a specified name, which is usually an alias.
+     * Later, you can refer this expression by the specified name.
+     */
+    private String add(String specifiedName, EvalNode evalNode) throws PlanningException {
+
+      // if a name already exists, it only just keeps an actual
+      // expression instead of a column reference.
+      if (nameToIdBiMap.containsKey(specifiedName)) {
+        int refId = nameToIdBiMap.get(specifiedName);
+        EvalNode found = idToEvalBiMap.get(refId);
+        if (found != null && !evalNode.equals(found)) {
+          if (found.getType() != EvalType.FIELD && evalNode.getType() != EvalType.FIELD) {
+            throw new PlanningException("Duplicate alias: " + evalNode);
+          }
+          if (found.getType() == EvalType.FIELD) {
+            idToEvalBiMap.forcePut(refId, evalNode);
+          }
+        }
+      }
+
+      int refId;
+      if (idToEvalBiMap.inverse().containsKey(evalNode)) {
+        refId = idToEvalBiMap.inverse().get(evalNode);
+      } else {
+        refId = getNextSeqId();
+        idToEvalBiMap.put(refId, evalNode);
+      }
+
+      nameToIdBiMap.put(specifiedName, refId);
+      TUtil.putToNestedList(idToNamesMap, refId, specifiedName);
+      evaluationStateMap.put(specifiedName, false);
+
+      for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
+        add(new FieldEval(column));
+      }
+
+      return specifiedName;
+    }
+
+    /**
+     * Adds an expression without any name. It returns an automatically
+     * generated name. It can be also used for referring this expression.
+     */
+    public String add(EvalNode evalNode) throws PlanningException {
+      String name;
+
+      if (idToEvalBiMap.inverse().containsKey(evalNode)) {
+        int refId = idToEvalBiMap.inverse().get(evalNode);
+        return getPrimaryName(refId);
+      }
+
+      if (evalNode.getType() == EvalType.FIELD) {
+        FieldEval fieldEval = (FieldEval) evalNode;
+        name = fieldEval.getName();
+      } else {
+        name = plan.generateUniqueColumnName(evalNode);
+      }
+
+      return add(name, evalNode);
+    }
+
+    public Collection<String> getNames() {
+      return nameToIdBiMap.keySet();
+    }
+
+    public String add(Target target) throws PlanningException {
+      return add(target.getCanonicalName(), target.getEvalTree());
+    }
+
+    /**
+     * Each expression can have one or more names.
+     * We call a name added with an expression firstly as the primary name.
+     * It has a special meaning. Since duplicated expression in logical planning are removed,
+     * the primary name is only used for each expression during logical planning.
+     *
+     * @param refId The identifier of an expression
+     * @param name The name to check if it is the primary name.
+     * @return True if this name is the primary added name. Otherwise, False.
+     */
+    private boolean isPrimaryName(int refId, String name) {
+      if (idToNamesMap.get(refId).size() > 0) {
+        return getPrimaryName(refId).equals(name);
+      } else {
+        return false;
+      }
+    }
+
+    private String getPrimaryName(int refId) {
+      return idToNamesMap.get(refId).get(0);
+    }
+
+    public Target getTarget(String name) {
+      if (!nameToIdBiMap.containsKey(name)) {
+        throw new RuntimeException("No Such target name: " + name);
+      }
+      int id = nameToIdBiMap.get(name);
+      EvalNode evalNode = idToEvalBiMap.get(id);
+
+      // if it is a constant value, just returns a constant because it can be evaluated everywhere.
+      if (evalNode.getType() == EvalType.CONST) {
+        return new Target(evalNode, name);
+      }
+
+      // if a name is not the primary name, it means that its expression may be already evaluated and
+      // can just refer a value. Consider an example as follows:
+      //
+      // select l_orderkey + 1 as total1, l_orderkey + 1 as total2 from lineitem
+      //
+      // In this case, total2 will meet the following condition. Then, total2 can
+      // just refer the result of total1 rather than calculating l_orderkey + 1.
+      if (!isPrimaryName(id, name) && isEvaluated(getPrimaryName(id))) {
+        evalNode = new FieldEval(getPrimaryName(id), evalNode.getValueType());
+      }
+
+      // if it is a column reference itself, just returns a column reference without any alias.
+      if (evalNode.getType() == EvalType.FIELD && evalNode.getName().equals(name)) {
+        return new Target((FieldEval)evalNode);
+      } else { // otherwise, it returns an expression.
+        return new Target(evalNode, name);
+      }
+    }
+
+    public boolean isEvaluated(String name) {
+      if (!nameToIdBiMap.containsKey(name)) {
+        throw new RuntimeException("No Such target name: " + name);
+      }
+      return evaluationStateMap.get(name);
+    }
+
+    public void markAsEvaluated(Target target) {
+      int refId = nameToIdBiMap.get(target.getCanonicalName());
+      EvalNode evalNode = target.getEvalTree();
+      if (!idToNamesMap.containsKey(refId)) {
+        throw new RuntimeException("No such eval: " + evalNode);
+      }
+      evaluationStateMap.put(target.getCanonicalName(), true);
+    }
+
+    public Iterator<Target> getFilteredTargets(Set<String> required) {
+      return new FilteredTargetIterator(required);
+    }
+
+    class FilteredTargetIterator implements Iterator<Target> {
+      List<Target> filtered = TUtil.newList();
+
+      public FilteredTargetIterator(Set<String> required) {
+        for (String name : nameToIdBiMap.keySet()) {
+          if (required.contains(name)) {
+            filtered.add(getTarget(name));
+          }
+        }
+      }
+
+      @Override
+      public boolean hasNext() {
+        return false;
+      }
+
+      @Override
+      public Target next() {
+        return null;
+      }
+
+      @Override
+      public void remove() {
+      }
+    }
+
+    public String toString() {
+      int evaluated = 0;
+      for (Boolean flag: evaluationStateMap.values()) {
+        if (flag) {
+          evaluated++;
+        }
+      }
+      return "eval=" + evaluationStateMap.size() + ", evaluated=" + evaluated;
+    }
+  }
+
+  static class Context {
+    TargetListManager targetListMgr;
+    Set<String> requiredSet;
+
+    public Context(LogicalPlan plan) {
+      requiredSet = new LinkedHashSet<String>();
+      targetListMgr = new TargetListManager(plan);
+    }
+
+    public Context(LogicalPlan plan, Collection<String> requiredSet) {
+      this.requiredSet = new LinkedHashSet<String>(requiredSet);
+      targetListMgr = new TargetListManager(plan);
+    }
+
+    public Context(Context upperContext) {
+      this.requiredSet = new LinkedHashSet<String>(upperContext.requiredSet);
+      targetListMgr = upperContext.targetListMgr;
+    }
+
+    public String addExpr(Target target) throws PlanningException {
+      String reference = targetListMgr.add(target);
+      addNecessaryReferences(target.getEvalTree());
+      return reference;
+    }
+
+    public String addExpr(EvalNode evalNode) throws PlanningException {
+      String reference = targetListMgr.add(evalNode);
+      addNecessaryReferences(evalNode);
+      return reference;
+    }
+
+    private void addNecessaryReferences(EvalNode evalNode) {
+      for (Column column : EvalTreeUtil.findUniqueColumns(evalNode)) {
+        requiredSet.add(column.getQualifiedName());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return "required=" + requiredSet.size() + "," + targetListMgr.toString();
+    }
+  }
+
+  @Override
+  public LogicalNode visitRoot(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LogicalRootNode node,
+                          Stack<LogicalNode> stack) throws PlanningException {
+    LogicalNode child = super.visitRoot(context, plan, block, node, stack);
+    node.setInSchema(child.getOutSchema());
+    node.setOutSchema(child.getOutSchema());
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitProjection(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     ProjectionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    Context newContext = new Context(context);
+    Target [] targets = node.getTargets();
+    int targetNum = targets.length;
+    String [] referenceNames = new String[targetNum];
+    for (int i = 0; i < targetNum; i++) {
+      referenceNames[i] = newContext.addExpr(targets[i]);
+    }
+
+    LogicalNode child = super.visitProjection(newContext, plan, block, node, stack);
+
+    node.setInSchema(child.getOutSchema());
+
+    int evaluationCount = 0;
+    List<Target> finalTargets = TUtil.newList();
+    for (String referenceName : referenceNames) {
+      Target target = context.targetListMgr.getTarget(referenceName);
+
+      if (context.targetListMgr.isEvaluated(referenceName)) {
+        finalTargets.add(new Target(new FieldEval(target.getNamedColumn())));
+      } else if (LogicalPlanner.checkIfBeEvaluatedAtThis(target.getEvalTree(), node)) {
+        finalTargets.add(target);
+        context.targetListMgr.markAsEvaluated(target);
+        evaluationCount++;
+      }
+    }
+
+    node.setTargets(finalTargets.toArray(new Target[finalTargets.size()]));
+    LogicalPlanner.verifyProjectedFields(block, node);
+
+    // Removing ProjectionNode
+    // TODO - Consider INSERT and CTAS statement, and then remove the check of stack.empty.
+    if (evaluationCount == 0 && PlannerUtil.targetToSchema(finalTargets).equals(child.getOutSchema())) {
+      if (stack.empty()) {
+        // if it is topmost, set it as the root of this block.
+        block.setRoot(child);
+      } else {
+        LogicalNode parentNode = stack.peek();
+        switch (parentNode.getType()) {
+        case ROOT:
+          LogicalRootNode rootNode = (LogicalRootNode) parentNode;
+          rootNode.setChild(child);
+          rootNode.setInSchema(child.getOutSchema());
+          rootNode.setOutSchema(child.getOutSchema());
+          break;
+        case TABLE_SUBQUERY:
+          TableSubQueryNode tableSubQueryNode = (TableSubQueryNode) parentNode;
+          tableSubQueryNode.setSubQuery(child);
+          break;
+        case STORE:
+          StoreTableNode storeTableNode = (StoreTableNode) parentNode;
+          storeTableNode.setChild(child);
+          storeTableNode.setInSchema(child.getOutSchema());
+          break;
+        case INSERT:
+          InsertNode insertNode = (InsertNode) parentNode;
+          insertNode.setSubQuery(child);
+          break;
+        case CREATE_TABLE:
+          CreateTableNode createTableNode = (CreateTableNode) parentNode;
+          createTableNode.setChild(child);
+          createTableNode.setInSchema(child.getOutSchema());
+          break;
+        default:
+          throw new PlanningException("Unexpected Parent Node: " + parentNode.getType());
+        }
+        plan.addHistory("ProjectionNode is eliminated.");
+      }
+
+      return child;
+
+    } else {
+      return node;
+    }
+  }
+
+  public LogicalNode visitLimit(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, LimitNode node,
+                           Stack<LogicalNode> stack) throws PlanningException {
+    LogicalNode child = super.visitLimit(context, plan, block, node, stack);
+
+    node.setInSchema(child.getOutSchema());
+    node.setOutSchema(child.getOutSchema());
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitSort(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                               SortNode node, Stack<LogicalNode> stack) throws PlanningException {
+    Context newContext = new Context(context);
+
+    final int sortKeyNum = node.getSortKeys().length;
+    String [] keyNames = new String[sortKeyNum];
+    for (int i = 0; i < sortKeyNum; i++) {
+      SortSpec sortSpec = node.getSortKeys()[i];
+      keyNames[i] = newContext.addExpr(new FieldEval(sortSpec.getSortKey()));
+    }
+
+    LogicalNode child = super.visitSort(newContext, plan, block, node, stack);
+
+    // it rewrite sortkeys. This rewrite sets right column names and eliminates duplicated sort keys.
+    List<SortSpec> sortSpecs = new ArrayList<SortSpec>();
+    for (int i = 0; i < keyNames.length; i++) {
+      String sortKey = keyNames[i];
+      Target target = context.targetListMgr.getTarget(sortKey);
+      if (context.targetListMgr.isEvaluated(sortKey)) {
+        Column c = target.getNamedColumn();
+        SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst());
+        if (!sortSpecs.contains(sortSpec)) {
+          sortSpecs.add(sortSpec);
+        }
+      } else {
+        if (target.getEvalTree().getType() == EvalType.FIELD) {
+          Column c = ((FieldEval)target.getEvalTree()).getColumnRef();
+          SortSpec sortSpec = new SortSpec(c, node.getSortKeys()[i].isAscending(), node.getSortKeys()[i].isNullFirst());
+          if (!sortSpecs.contains(sortSpec)) {
+            sortSpecs.add(sortSpec);
+          }
+        }
+      }
+    }
+    node.setSortSpecs(sortSpecs.toArray(new SortSpec[sortSpecs.size()]));
+
+    node.setInSchema(child.getOutSchema());
+    node.setOutSchema(child.getOutSchema());
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitHaving(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, HavingNode node,
+                            Stack<LogicalNode> stack) throws PlanningException {
+    Context newContext = new Context(context);
+    String referenceName = newContext.targetListMgr.add(node.getQual());
+    newContext.addNecessaryReferences(node.getQual());
+
+    LogicalNode child = super.visitHaving(newContext, plan, block, node, stack);
+
+    node.setInSchema(child.getOutSchema());
+    node.setOutSchema(child.getOutSchema());
+
+    Target target = context.targetListMgr.getTarget(referenceName);
+    if (newContext.targetListMgr.isEvaluated(referenceName)) {
+      node.setQual(new FieldEval(target.getNamedColumn()));
+    } else {
+      node.setQual(target.getEvalTree());
+      newContext.targetListMgr.markAsEvaluated(target);
+    }
+
+    return node;
+  }
+
+  public LogicalNode visitGroupBy(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, GroupbyNode node,
+                             Stack<LogicalNode> stack) throws PlanningException {
+    Context newContext = new Context(context);
+
+    // Getting grouping key names
+    final int groupingKeyNum = node.getGroupingColumns().length;
+    String [] groupingKeyNames = null;
+    if (groupingKeyNum > 0) {
+      groupingKeyNames = new String[groupingKeyNum];
+      for (int i = 0; i < groupingKeyNum; i++) {
+        FieldEval fieldEval = new FieldEval(node.getGroupingColumns()[i]);
+        groupingKeyNames[i] = newContext.addExpr(fieldEval);
+      }
+    }
+
+    // Getting eval names
+
+    final String [] aggEvalNames;
+    if (node.hasAggFunctions()) {
+      final int evalNum = node.getAggFunctions().length;
+      aggEvalNames = new String[evalNum];
+      for (int evalIdx = 0, targetIdx = groupingKeyNum; targetIdx < node.getTargets().length; evalIdx++, targetIdx++) {
+        Target target = node.getTargets()[targetIdx];
+        EvalNode evalNode = node.getAggFunctions()[evalIdx];
+        aggEvalNames[evalIdx] = newContext.addExpr(new Target(evalNode, target.getCanonicalName()));
+      }
+    } else {
+      aggEvalNames = null;
+    }
+
+    // visit a child node
+    LogicalNode child = super.visitGroupBy(newContext, plan, block, node, stack);
+
+    node.setInSchema(child.getOutSchema());
+
+    List<Target> targets = Lists.newArrayList();
+    if (groupingKeyNum > 0 && groupingKeyNames != null) {
+      // Restoring grouping key columns
+      final List<Column> groupingColumns = new ArrayList<Column>();
+      for (int i = 0; i < groupingKeyNum; i++) {
+        String groupingKey = groupingKeyNames[i];
+
+        Target target = context.targetListMgr.getTarget(groupingKey);
+
+        // it rewrite grouping keys.
+        // This rewrite sets right column names and eliminates duplicated grouping keys.
+        if (context.targetListMgr.isEvaluated(groupingKey)) {
+          Column c = target.getNamedColumn();
+          if (!groupingColumns.contains(c)) {
+            groupingColumns.add(c);
+            targets.add(new Target(new FieldEval(target.getNamedColumn())));
+          }
+        } else {
+          if (target.getEvalTree().getType() == EvalType.FIELD) {
+            Column c = ((FieldEval)target.getEvalTree()).getColumnRef();
+            if (!groupingColumns.contains(c)) {
+              groupingColumns.add(c);
+              targets.add(target);
+              context.targetListMgr.markAsEvaluated(target);
+            }
+          } else {
+            throw new PlanningException("Cannot evaluate this expression in grouping keys: " + target.getEvalTree());
+          }
+        }
+      }
+
+      node.setGroupingColumns(groupingColumns.toArray(new Column[groupingColumns.size()]));
+    }
+
+    // Getting projected targets
+    if (node.hasAggFunctions() && aggEvalNames != null) {
+      AggregationFunctionCallEval [] aggEvals = new AggregationFunctionCallEval[aggEvalNames.length];
+      int i = 0;
+      for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) {
+
+        String referenceName = it.next();
+        Target target = context.targetListMgr.getTarget(referenceName);
+
+        if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) {
+          aggEvals[i++] = target.getEvalTree();
+          context.targetListMgr.markAsEvaluated(target);
+        }
+      }
+      if (aggEvals.length > 0) {
+        node.setAggFunctions(aggEvals);
+      }
+    }
+    Target [] finalTargets = buildGroupByTarget(node, targets, aggEvalNames);
+    node.setTargets(finalTargets);
+
+    LogicalPlanner.verifyProjectedFields(block, node);
+
+    return node;
+  }
+
+  public static Target [] buildGroupByTarget(GroupbyNode groupbyNode, @Nullable List<Target> groupingKeyTargets,
+                                             String [] aggEvalNames) {
+    final int groupingKeyNum =
+        groupingKeyTargets == null ? groupbyNode.getGroupingColumns().length : groupingKeyTargets.size();
+    final int aggrFuncNum = aggEvalNames != null ? aggEvalNames.length : 0;
+    EvalNode [] aggEvalNodes = groupbyNode.getAggFunctions();
+    Target [] targets = new Target[groupingKeyNum + aggrFuncNum];
+
+    if (groupingKeyTargets != null) {
+      for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
+        targets[groupingKeyIdx] = groupingKeyTargets.get(groupingKeyIdx);
+      }
+    } else {
+      for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
+        targets[groupingKeyIdx] = new Target(new FieldEval(groupbyNode.getGroupingColumns()[groupingKeyIdx]));
+      }
+    }
+
+    if (aggEvalNames != null) {
+      for (int aggrFuncIdx = 0, targetIdx = groupingKeyNum; aggrFuncIdx < aggrFuncNum; aggrFuncIdx++, targetIdx++) {
+        targets[targetIdx] =
+            new Target(new FieldEval(aggEvalNames[aggrFuncIdx], aggEvalNodes[aggrFuncIdx].getValueType()));
+      }
+    }
+
+    return targets;
+  }
+
+  public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                 SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
+    Context newContext = new Context(context);
+    String referenceName = newContext.targetListMgr.add(node.getQual());
+    newContext.addNecessaryReferences(node.getQual());
+
+    LogicalNode child = super.visitFilter(newContext, plan, block, node, stack);
+
+    node.setInSchema(child.getOutSchema());
+    node.setOutSchema(child.getOutSchema());
+
+    Target target = context.targetListMgr.getTarget(referenceName);
+    if (newContext.targetListMgr.isEvaluated(referenceName)) {
+      node.setQual(new FieldEval(target.getNamedColumn()));
+    } else {
+      node.setQual(target.getEvalTree());
+      newContext.targetListMgr.markAsEvaluated(target);
+    }
+
+    return node;
+  }
+
+  public LogicalNode visitJoin(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode node,
+                          Stack<LogicalNode> stack) throws PlanningException {
+    Context newContext = new Context(context);
+
+    String joinQualReference = null;
+    if (node.hasJoinQual()) {
+      joinQualReference = newContext.addExpr(node.getJoinQual());
+      newContext.addNecessaryReferences(node.getJoinQual());
+    }
+
+    String [] referenceNames = null;
+    if (node.hasTargets()) {
+      referenceNames = new String[node.getTargets().length];
+      int i = 0;
+      for (Iterator<Target> it = getFilteredTarget(node.getTargets(), context.requiredSet); it.hasNext();) {
+        Target target = it.next();
+        referenceNames[i++] = newContext.addExpr(target);
+      }
+    }
+
+    stack.push(node);
+    LogicalNode left = visit(newContext, plan, block, node.getLeftChild(), stack);
+    LogicalNode right = visit(newContext, plan, block, node.getRightChild(), stack);
+    stack.pop();
+
+    Schema merged = SchemaUtil.merge(left.getOutSchema(), right.getOutSchema());
+
+    node.setInSchema(merged);
+
+    if (node.hasJoinQual()) {
+      Target target = context.targetListMgr.getTarget(joinQualReference);
+      if (newContext.targetListMgr.isEvaluated(joinQualReference)) {
+        throw new PlanningException("Join condition must be evaluated in the proper Join Node: " + joinQualReference);
+      } else {
+        node.setJoinQual(target.getEvalTree());
+        newContext.targetListMgr.markAsEvaluated(target);
+      }
+    }
+
+    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
+    for (Iterator<String> it = getFilteredReferences(context.targetListMgr.getNames(),
+        context.requiredSet); it.hasNext();) {
+      String referenceName = it.next();
+      Target target = context.targetListMgr.getTarget(referenceName);
+
+      if (context.targetListMgr.isEvaluated(referenceName)) {
+        Target fieldReference = new Target(new FieldEval(target.getNamedColumn()));
+        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, fieldReference.getEvalTree(), node,
+            stack.peek().getType() != NodeType.JOIN)) {
+          projectedTargets.add(fieldReference);
+        }
+      } else if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, target.getEvalTree(), node,
+          stack.peek().getType() != NodeType.JOIN)) {
+        projectedTargets.add(target);
+        context.targetListMgr.markAsEvaluated(target);
+      }
+    }
+
+    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+    LogicalPlanner.verifyProjectedFields(block, node);
+    return node;
+  }
+
+  static Iterator<String> getFilteredReferences(Collection<String> targetNames, Set<String> required) {
+    return new FilteredStringsIterator(targetNames, required);
+  }
+
+  static Iterator<String> getFilteredReferences(String [] targetNames, Collection<String> required) {
+    return new FilteredStringsIterator(targetNames, required);
+  }
+
+  static class FilteredStringsIterator implements Iterator<String> {
+    Iterator<String> iterator;
+
+    FilteredStringsIterator(Collection<String> targetNames, Collection<String> required) {
+      List<String> filtered = TUtil.newList();
+      for (String name : targetNames) {
+        if (required.contains(name)) {
+          filtered.add(name);
+        }
+      }
+
+      iterator = filtered.iterator();
+    }
+
+    FilteredStringsIterator(String [] targetNames, Collection<String> required) {
+      this(TUtil.newList(targetNames), required);
+    }
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public String next() {
+      return iterator.next();
+    }
+
+    @Override
+    public void remove() {
+    }
+  }
+
+  static Iterator<Target> getFilteredTarget(Target[] targets, Set<String> required) {
+    return new FilteredIterator(targets, required);
+  }
+
+  static class FilteredIterator implements Iterator<Target> {
+    Iterator<Target> iterator;
+
+    FilteredIterator(Target [] targets, Set<String> requiredReferences) {
+      List<Target> filtered = TUtil.newList();
+      Map<String, Target> targetSet = new HashMap<String, Target>();
+      for (Target t : targets) {
+        // Only should keep an raw target instead of field reference.
+        if (targetSet.containsKey(t.getCanonicalName())) {
+          Target targetInSet = targetSet.get(t.getCanonicalName());
+          EvalNode evalNode = targetInSet.getEvalTree();
+          if (evalNode.getType() == EvalType.FIELD && t.getEvalTree().getType() != EvalType.FIELD) {
+            targetSet.put(t.getCanonicalName(), t);
+          }
+        } else {
+          targetSet.put(t.getCanonicalName(), t);
+        }
+      }
+
+      for (String name : requiredReferences) {
+        if (targetSet.containsKey(name)) {
+          filtered.add(targetSet.get(name));
+        }
+      }
+
+      iterator = filtered.iterator();
+    }
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public Target next() {
+      return iterator.next();
+    }
+
+    @Override
+    public void remove() {
+    }
+  }
+
+  @Override
+  public LogicalNode visitUnion(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, UnionNode node,
+                           Stack<LogicalNode> stack) throws PlanningException {
+
+    LogicalPlan.QueryBlock leftBlock = plan.getBlock(node.getLeftChild());
+    LogicalPlan.QueryBlock rightBlock = plan.getBlock(node.getRightChild());
+
+    Context leftContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet,
+        leftBlock.getName()));
+    Context rightContext = new Context(plan, PlannerUtil.toQualifiedFieldNames(context.requiredSet,
+        rightBlock.getName()));
+
+    stack.push(node);
+    visit(leftContext, plan, leftBlock, leftBlock.getRoot(), new Stack<LogicalNode>());
+    visit(rightContext, plan, rightBlock, rightBlock.getRoot(), new Stack<LogicalNode>());
+    stack.pop();
+    return node;
+  }
+
+  public LogicalNode visitScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
+                          Stack<LogicalNode> stack) throws PlanningException {
+
+    Context newContext = new Context(context);
+
+    Target [] targets;
+    if (node.hasTargets()) {
+      targets = node.getTargets();
+    } else {
+      targets = PlannerUtil.schemaToTargets(node.getTableSchema());
+    }
+
+    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
+    for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) {
+      Target target = it.next();
+      newContext.addExpr(target);
+    }
+
+    for (Iterator<Target> it = getFilteredTarget(targets, context.requiredSet); it.hasNext();) {
+      Target target = it.next();
+
+      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
+        projectedTargets.add(target);
+        newContext.targetListMgr.markAsEvaluated(target);
+      }
+    }
+
+    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+    LogicalPlanner.verifyProjectedFields(block, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitPartitionedTableScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                               PartitionedTableScanNode node, Stack<LogicalNode> stack)
+      throws PlanningException {
+
+    Context newContext = new Context(context);
+
+    Target [] targets;
+    if (node.hasTargets()) {
+      targets = node.getTargets();
+    } else {
+      targets = PlannerUtil.schemaToTargets(node.getOutSchema());
+    }
+
+    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
+    for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) {
+      Target target = it.next();
+      newContext.addExpr(target);
+    }
+
+    for (Iterator<Target> it = getFilteredTarget(targets, context.requiredSet); it.hasNext();) {
+      Target target = it.next();
+
+      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
+        projectedTargets.add(target);
+        newContext.targetListMgr.markAsEvaluated(target);
+      }
+    }
+
+    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+    LogicalPlanner.verifyProjectedFields(block, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                   TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
+    Context childContext = new Context(plan, upperContext.requiredSet);
+    stack.push(node);
+    LogicalNode child = super.visitTableSubQuery(childContext, plan, block, node, stack);
+    node.setSubQuery(child);
+    stack.pop();
+
+    Context newContext = new Context(upperContext);
+
+    Target [] targets;
+    if (node.hasTargets()) {
+      targets = node.getTargets();
+    } else {
+      targets = PlannerUtil.schemaToTargets(node.getOutSchema());
+    }
+
+    LinkedHashSet<Target> projectedTargets = Sets.newLinkedHashSet();
+    for (Iterator<Target> it = getFilteredTarget(targets, newContext.requiredSet); it.hasNext();) {
+      Target target = it.next();
+      childContext.addExpr(target);
+    }
+
+    for (Iterator<Target> it = getFilteredTarget(targets, upperContext.requiredSet); it.hasNext();) {
+      Target target = it.next();
+
+      if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, target.getEvalTree(), node)) {
+        projectedTargets.add(target);
+        childContext.targetListMgr.markAsEvaluated(target);
+      }
+    }
+
+    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+    LogicalPlanner.verifyProjectedFields(block, node);
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitInsert(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block, InsertNode node,
+                            Stack<LogicalNode> stack) throws PlanningException {
+    stack.push(node);
+    visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return node;
+  }
+}


Mime
View raw message