tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [1/5] tajo git commit: TAJO-1300: Merge the index branch into the master branch.
Date Thu, 30 Jul 2015 04:38:59 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 97e61e6f4 -> 9840d3785


http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java
new file mode 100644
index 0000000..afabe7a
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/AccessPathRewriter.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.RelationNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+
+import java.util.List;
+import java.util.Stack;
+
+public class AccessPathRewriter implements LogicalPlanRewriteRule {
+  private static final Log LOG = LogFactory.getLog(AccessPathRewriter.class);
+
+  private static final String NAME = "Access Path Rewriter";
+  private Rewriter rewriter = new Rewriter();
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    if (context.getQueryContext().getBool(SessionVars.INDEX_ENABLED)) {
+      for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) {
+        for (RelationNode relationNode : block.getRelations()) {
+          List<AccessPathInfo> accessPathInfos = block.getAccessInfos(relationNode);
+          // If there are any alternative access paths
+          if (accessPathInfos.size() > 1) {
+            for (AccessPathInfo accessPathInfo : accessPathInfos) {
+              if (accessPathInfo.getScanType() == AccessPathInfo.ScanTypeControl.INDEX_SCAN) {
+                return true;
+              }
+            }
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException {
+    LogicalPlan plan = context.getPlan();
+    LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
+    rewriter.init(context.getQueryContext());
+    rewriter.visit(rootBlock, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
+    return plan;
+  }
+
+  private final class Rewriter extends BasicLogicalPlanVisitor<Object, Object> {
+
+    private OverridableConf conf;
+
+    public void init(OverridableConf conf) {
+      this.conf = conf;
+    }
+
+    @Override
+    public Object visitScan(Object object, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode scanNode,
+                            Stack<LogicalNode> stack) throws TajoException {
+      List<AccessPathInfo> accessPaths = block.getAccessInfos(scanNode);
+      AccessPathInfo optimalPath = null;
+      // initialize
+      for (AccessPathInfo accessPath : accessPaths) {
+        if (accessPath.getScanType() == AccessPathInfo.ScanTypeControl.SEQ_SCAN) {
+          optimalPath = accessPath;
+          break;
+        }
+      }
+      // find the optimal path
+      for (AccessPathInfo accessPath : accessPaths) {
+        if (accessPath.getScanType() == AccessPathInfo.ScanTypeControl.INDEX_SCAN) {
+          // estimation selectivity and choose the better path
+          // TODO: improve the selectivity estimation
+          double estimateSelectivity = 0.001;
+          double selectivityThreshold = conf.getFloat(SessionVars.INDEX_SELECTIVITY_THRESHOLD);
+          LOG.info("Selectivity threshold: " + selectivityThreshold);
+          LOG.info("Estimated selectivity: " + estimateSelectivity);
+          if (estimateSelectivity < selectivityThreshold) {
+            // if the estimated selectivity is greater than threshold, use the index scan
+            optimalPath = accessPath;
+          }
+        }
+      }
+
+      if (optimalPath != null && optimalPath.getScanType() == AccessPathInfo.ScanTypeControl.INDEX_SCAN) {
+        IndexScanInfo indexScanInfo = (IndexScanInfo) optimalPath;
+        plan.addHistory("AccessPathRewriter chooses the index scan for " + scanNode.getTableName());
+        IndexScanNode indexScanNode = new IndexScanNode(plan.newPID(), scanNode, indexScanInfo.getKeySchema(),
+            indexScanInfo.getPredicates(), indexScanInfo.getIndexPath());
+        if (stack.empty() || block.getRoot().equals(scanNode)) {
+          block.setRoot(indexScanNode);
+        } else {
+          PlannerUtil.replaceNode(plan, stack.peek(), scanNode, indexScanNode);
+        }
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
index 98e674e..4fb8aac 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
@@ -21,10 +21,13 @@ package org.apache.tajo.plan.rewrite.rules;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.datum.Datum;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -35,7 +38,10 @@ import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.LogicalPlan.QueryBlock;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule.FilterPushDownContext;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
+import org.apache.tajo.plan.util.IndexUtil;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
@@ -52,6 +58,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   private final static Log LOG = LogFactory.getLog(FilterPushDownRule.class);
   private static final String NAME = "FilterPushDown";
 
+  private CatalogService catalog;
+
   static class FilterPushDownContext {
     Set<EvalNode> pushingDownFilters = TUtil.newHashSet();
 
@@ -85,8 +93,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) {
       if (block.hasNode(NodeType.SELECTION) || block.hasNode(NodeType.JOIN)) {
         return true;
       }
@@ -95,7 +103,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext rewriteRuleContext) throws TajoException {
     /*
     FilterPushDown rule: processing when visits each node
       - If a target which is corresponding on a filter EvalNode's column is not FieldEval, do not PushDown.
@@ -107,6 +115,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
         . It not, create new HavingNode and set parent's child.
      */
     FilterPushDownContext context = new FilterPushDownContext();
+    LogicalPlan plan = rewriteRuleContext.getPlan();
+    catalog = rewriteRuleContext.getCatalog();
     for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
       context.clear();
       this.visit(context, plan, block, block.getRoot(), new Stack<LogicalNode>());
@@ -877,7 +887,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
   @Override
   public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
-                               LogicalPlan.QueryBlock block, ScanNode scanNode,
+                               LogicalPlan.QueryBlock block, final ScanNode scanNode,
                                Stack<LogicalNode> stack) throws TajoException {
     List<EvalNode> matched = TUtil.newList();
 
@@ -947,8 +957,42 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
       qual = matched.iterator().next();
     }
 
+    block.addAccessPath(scanNode, new SeqScanInfo(table));
     if (qual != null) { // if a matched qual exists
       scanNode.setQual(qual);
+
+      // Index path can be identified only after filters are pushed into each scan.
+      String databaseName, tableName;
+      databaseName = CatalogUtil.extractQualifier(table.getName());
+      tableName = CatalogUtil.extractSimpleName(table.getName());
+      Set<Predicate> predicates = TUtil.newHashSet();
+      for (EvalNode eval : IndexUtil.getAllEqualEvals(qual)) {
+        BinaryEval binaryEval = (BinaryEval) eval;
+        // TODO: consider more complex predicates
+        if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
+            binaryEval.getRightExpr().getType() == EvalType.CONST) {
+          predicates.add(new Predicate(binaryEval.getType(),
+              ((FieldEval) binaryEval.getLeftExpr()).getColumnRef(),
+              ((ConstEval)binaryEval.getRightExpr()).getValue()));
+        } else if (binaryEval.getLeftExpr().getType() == EvalType.CONST &&
+            binaryEval.getRightExpr().getType() == EvalType.FIELD) {
+          predicates.add(new Predicate(binaryEval.getType(),
+              ((FieldEval) binaryEval.getRightExpr()).getColumnRef(),
+              ((ConstEval)binaryEval.getLeftExpr()).getValue()));
+        }
+      }
+
+      // for every subset of the set of columns, find all matched index paths
+      for (Set<Predicate> subset : Sets.powerSet(predicates)) {
+        if (subset.size() == 0)
+          continue;
+        Column[] columns = extractColumns(subset);
+        if (catalog.existIndexByColumns(databaseName, tableName, columns)) {
+          IndexDesc indexDesc = catalog.getIndexByColumns(databaseName, tableName, columns);
+          block.addAccessPath(scanNode, new IndexScanInfo(
+              table.getStats(), indexDesc, getSimplePredicates(indexDesc, subset)));
+        }
+      }
     }
 
     for (EvalNode matchedEval: matched) {
@@ -961,6 +1005,49 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     return scanNode;
   }
 
+  private static class Predicate {
+    Column column;
+    Datum value;
+    EvalType evalType;
+
+    public Predicate(EvalType evalType, Column column, Datum value) {
+      this.evalType = evalType;
+      this.column = column;
+      this.value = value;
+    }
+  }
+
+  private static SimplePredicate[] getSimplePredicates(IndexDesc desc, Set<Predicate> predicates) {
+    SimplePredicate[] simplePredicates = new SimplePredicate[predicates.size()];
+    Map<Column, Datum> colToValue = TUtil.newHashMap();
+    for (Predicate predicate : predicates) {
+      colToValue.put(predicate.column, predicate.value);
+    }
+    SortSpec [] keySortSpecs = desc.getKeySortSpecs();
+    for (int i = 0; i < keySortSpecs.length; i++) {
+      simplePredicates[i] = new SimplePredicate(keySortSpecs[i],
+          colToValue.get(keySortSpecs[i].getSortKey()));
+    }
+    return simplePredicates;
+  }
+
+  private static Datum[] extractPredicateValues(List<Predicate> predicates) {
+    Datum[] values = new Datum[predicates.size()];
+    for (int i = 0; i < values.length; i++) {
+      values[i] = predicates.get(i).value;
+    }
+    return values;
+  }
+
+  private static Column[] extractColumns(Set<Predicate> predicates) {
+    Column[] columns = new Column[predicates.size()];
+    int i = 0;
+    for (Predicate p : predicates) {
+      columns[i++] = p.column;
+    }
+    return columns;
+  }
+
   private void errorFilterPushDown(LogicalPlan plan, LogicalNode node,
                                    FilterPushDownContext context) {
     String prefix = "";

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
new file mode 100644
index 0000000..9ac8ccf
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/IndexScanInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.IndexDesc;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.plan.serder.EvalNodeDeserializer;
+import org.apache.tajo.plan.serder.EvalNodeSerializer;
+import org.apache.tajo.plan.serder.PlanProto.SimplePredicateProto;
+
+import java.net.URI;
+
+public class IndexScanInfo extends AccessPathInfo {
+
+  /**
+   * Simple predicate represents an equal eval expression which consists of
+   * a column and a value.
+   */
+  // TODO: extend to represent more complex expressions
+  public static class SimplePredicate implements ProtoObject<SimplePredicateProto> {
+    @Expose private SortSpec keySortSpec;
+    @Expose private Datum value;
+
+    public SimplePredicate(SortSpec keySortSpec, Datum value) {
+      this.keySortSpec = keySortSpec;
+      this.value = value;
+    }
+
+    public SimplePredicate(SimplePredicateProto proto) {
+      keySortSpec = new SortSpec(proto.getKeySortSpec());
+      value = EvalNodeDeserializer.deserialize(proto.getValue());
+    }
+
+    public SortSpec getKeySortSpec() {
+      return keySortSpec;
+    }
+
+    public Datum getValue() {
+      return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof SimplePredicate) {
+        SimplePredicate other = (SimplePredicate) o;
+        return this.keySortSpec.equals(other.keySortSpec) && this.value.equals(other.value);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public Object clone() throws CloneNotSupportedException {
+      SimplePredicate clone = new SimplePredicate(this.keySortSpec, this.value);
+      return clone;
+    }
+
+    @Override
+    public SimplePredicateProto getProto() {
+      SimplePredicateProto.Builder builder = SimplePredicateProto.newBuilder();
+      builder.setKeySortSpec(keySortSpec.getProto());
+      builder.setValue(EvalNodeSerializer.serialize(value));
+      return builder.build();
+    }
+  }
+
+  private final URI indexPath;
+  private final Schema keySchema;
+  private final SimplePredicate[] predicates;
+
+  public IndexScanInfo(TableStats tableStats, IndexDesc indexDesc, SimplePredicate[] predicates) {
+    super(ScanTypeControl.INDEX_SCAN, tableStats);
+    this.indexPath = indexDesc.getIndexPath();
+    keySchema = new Schema();
+    this.predicates = predicates;
+    for (SimplePredicate predicate : predicates) {
+      keySchema.addColumn(predicate.getKeySortSpec().getSortKey());
+    }
+  }
+
+  public URI getIndexPath() {
+    return indexPath;
+  }
+
+  public Schema getKeySchema() {
+    return keySchema;
+  }
+
+  public SimplePredicate[] getPredicates() {
+    return predicates;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
index afe0c4d..c35194e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/LogicalPlanEqualityTester.java
@@ -18,11 +18,11 @@
 
 package org.apache.tajo.plan.rewrite.rules;
 
-import org.apache.tajo.OverridableConf;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
 import org.apache.tajo.plan.serder.LogicalNodeSerializer;
 import org.apache.tajo.plan.serder.PlanProto;
@@ -40,15 +40,16 @@ public class LogicalPlanEqualityTester implements LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
     return true;
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException {
+    LogicalPlan plan = context.getPlan();
     LogicalNode root = plan.getRootBlock().getRoot();
     PlanProto.LogicalNodeTree serialized = LogicalNodeSerializer.serialize(plan.getRootBlock().getRoot());
-    LogicalNode deserialized = LogicalNodeDeserializer.deserialize(queryContext, null, serialized);
+    LogicalNode deserialized = LogicalNodeDeserializer.deserialize(context.getQueryContext(), null, serialized);
     assert root.deepEquals(deserialized);
     return plan;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index 71b9fd7..244d385 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -36,6 +36,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.storage.Tuple;
@@ -59,8 +60,8 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    for (LogicalPlan.QueryBlock block : context.getPlan().getQueryBlocks()) {
       for (RelationNode relation : block.getRelations()) {
         if (relation.getType() == NodeType.SCAN) {
           TableDesc table = ((ScanNode)relation).getTableDesc();
@@ -74,9 +75,10 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule {
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException {
+    LogicalPlan plan = context.getPlan();
     LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
-    rewriter.visit(queryContext, plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
+    rewriter.visit(context.getQueryContext(), plan, rootBlock, rootBlock.getRoot(), new Stack<LogicalNode>());
     return plan;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
index 650a484..78a5dad 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
@@ -37,6 +37,7 @@ import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.util.TUtil;
@@ -61,13 +62,13 @@ public class ProjectionPushDownRule extends
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    LogicalNode toBeOptimized = plan.getRootBlock().getRoot();
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    LogicalNode toBeOptimized = context.getPlan().getRootBlock().getRoot();
 
     if (PlannerUtil.checkIfDDLPlan(toBeOptimized)) {
       return false;
     }
-    for (QueryBlock eachBlock: plan.getQueryBlocks()) {
+    for (QueryBlock eachBlock: context.getPlan().getQueryBlocks()) {
       if (eachBlock.hasTableExpression()) {
         return true;
       }
@@ -76,7 +77,8 @@ public class ProjectionPushDownRule extends
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext rewriteRuleContext) throws TajoException {
+    LogicalPlan plan = rewriteRuleContext.getPlan();
     LogicalPlan.QueryBlock rootBlock = plan.getRootBlock();
 
     LogicalPlan.QueryBlock topmostBlock = rootBlock;
@@ -524,6 +526,11 @@ public class ProjectionPushDownRule extends
           createTableNode.setChild(child);
           createTableNode.setInSchema(child.getOutSchema());
           break;
+        case CREATE_INDEX:
+          CreateIndexNode createIndexNode = (CreateIndexNode) parentNode;
+          createIndexNode.setChild(child);
+          createIndexNode.setInSchema(child.getOutSchema());
+          break;
         default:
           throw new TajoInternalError("unexpected parent node: " + parentNode.getType());
         }
@@ -1111,6 +1118,12 @@ public class ProjectionPushDownRule extends
   }
 
   @Override
+  public LogicalNode visitIndexScan(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    IndexScanNode node, Stack<LogicalNode> stack) throws TajoException {
+    return visitScan(context, plan, block,node, stack);
+  }
+
+  @Override
   public LogicalNode visitTableSubQuery(Context upperContext, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                    TableSubQueryNode node, Stack<LogicalNode> stack) throws TajoException {
     Context childContext = new Context(plan, upperContext.requiredSet);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java
new file mode 100644
index 0000000..0c054bd
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/SeqScanInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite.rules;
+
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.statistics.TableStats;
+
+public class SeqScanInfo extends AccessPathInfo {
+  private TableDesc tableDesc;
+
+  public SeqScanInfo(TableStats tableStats) {
+    super(ScanTypeControl.SEQ_SCAN, tableStats);
+  }
+
+  public SeqScanInfo(TableDesc tableDesc) {
+    this(tableDesc.getStats());
+    this.setTableDesc(tableDesc);
+  }
+
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+
+  public void setTableDesc(TableDesc tableDesc) {
+    this.tableDesc = tableDesc;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index fe900c0..25897f2 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -34,10 +34,12 @@ import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.*;
 
 /**
@@ -124,6 +126,9 @@ public class LogicalNodeDeserializer {
       case SCAN:
         current = convertScan(context, evalContext, protoNode);
         break;
+      case INDEX_SCAN:
+        current = convertIndexScan(context, evalContext, protoNode);
+        break;
 
       case CREATE_TABLE:
         current = convertCreateTable(nodeMap, protoNode);
@@ -152,6 +157,13 @@ public class LogicalNodeDeserializer {
         current = convertTruncateTable(protoNode);
         break;
 
+      case CREATE_INDEX:
+        current = convertCreateIndex(nodeMap, protoNode);
+        break;
+      case DROP_INDEX:
+        current = convertDropIndex(protoNode);
+        break;
+
       default:
         throw new RuntimeException("Unknown NodeType: " + protoNode.getType().name());
       }
@@ -427,6 +439,23 @@ public class LogicalNodeDeserializer {
     scan.setOutSchema(convertSchema(protoNode.getOutSchema()));
   }
 
+  private static IndexScanNode convertIndexScan(OverridableConf context, EvalContext evalContext,
+                                                PlanProto.LogicalNode protoNode) {
+    IndexScanNode indexScan = new IndexScanNode(protoNode.getNodeId());
+    fillScanNode(context, evalContext, protoNode, indexScan);
+
+    PlanProto.IndexScanSpec indexScanSpec = protoNode.getIndexScan();
+    SimplePredicate[] predicates = new SimplePredicate[indexScanSpec.getPredicatesCount()];
+    for (int i = 0; i < predicates.length; i++) {
+      predicates[i] = new SimplePredicate(indexScanSpec.getPredicates(i));
+    }
+
+    indexScan.set(new Schema(indexScanSpec.getKeySchema()), predicates,
+        TUtil.stringToURI(indexScanSpec.getIndexPath()));
+
+    return indexScan;
+  }
+
   private static PartitionedTableScanNode convertPartitionScan(OverridableConf context, EvalContext evalContext,
                                                                PlanProto.LogicalNode protoNode) {
     PartitionedTableScanNode partitionedScan = new PartitionedTableScanNode(protoNode.getNodeId());
@@ -628,6 +657,46 @@ public class LogicalNodeDeserializer {
     return truncateTable;
   }
 
+  private static CreateIndexNode convertCreateIndex(Map<Integer, LogicalNode> nodeMap,
+                                                    PlanProto.LogicalNode protoNode) {
+    CreateIndexNode createIndex = new CreateIndexNode(protoNode.getNodeId());
+
+    PlanProto.CreateIndexNode createIndexProto = protoNode.getCreateIndex();
+    createIndex.setIndexName(createIndexProto.getIndexName());
+    createIndex.setIndexMethod(createIndexProto.getIndexMethod());
+    try {
+      createIndex.setIndexPath(new URI(createIndexProto.getIndexPath()));
+    } catch (URISyntaxException e) {
+      e.printStackTrace();
+    }
+    SortSpec[] keySortSpecs = new SortSpec[createIndexProto.getKeySortSpecsCount()];
+    for (int i = 0; i < keySortSpecs.length; i++) {
+      keySortSpecs[i] = new SortSpec(createIndexProto.getKeySortSpecs(i));
+    }
+    createIndex.setKeySortSpecs(new Schema(createIndexProto.getTargetRelationSchema()),
+        keySortSpecs);
+    createIndex.setUnique(createIndexProto.getIsUnique());
+    createIndex.setClustered(createIndexProto.getIsClustered());
+    if (createIndexProto.hasIndexProperties()) {
+      createIndex.setOptions(new KeyValueSet(createIndexProto.getIndexProperties()));
+    }
+    createIndex.setChild(nodeMap.get(createIndexProto.getChildSeq()));
+    createIndex.setInSchema(convertSchema(protoNode.getInSchema()));
+    createIndex.setOutSchema(convertSchema(protoNode.getOutSchema()));
+    createIndex.setExternal(createIndexProto.getIsExternal());
+
+    return createIndex;
+  }
+
+  private static DropIndexNode convertDropIndex(PlanProto.LogicalNode protoNode) {
+    DropIndexNode dropIndex = new DropIndexNode(protoNode.getNodeId());
+
+    PlanProto.DropIndexNode dropIndexProto = protoNode.getDropIndex();
+    dropIndex.setIndexName(dropIndexProto.getIndexName());
+
+    return dropIndex;
+  }
+
   private static AggregationFunctionCallEval [] convertAggFuncCallEvals(OverridableConf context, EvalContext evalContext,
                                                                        List<PlanProto.EvalNodeTree> evalTrees) {
     AggregationFunctionCallEval [] aggFuncs = new AggregationFunctionCallEval[evalTrees.size()];

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index 0207932..fe69fc1 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -21,13 +21,16 @@ package org.apache.tajo.plan.serder;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.SortSpecProto;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.AddColumn;
 import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameColumn;
 import org.apache.tajo.plan.serder.PlanProto.AlterTableNode.RenameTable;
@@ -105,6 +108,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     private LogicalNodeTree.Builder treeBuilder = LogicalNodeTree.newBuilder();
   }
 
+  @Override
   public LogicalNode visitRoot(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                LogicalRootNode root, Stack<LogicalNode> stack) throws TajoException {
     super.visitRoot(context, plan, block, root, stack);
@@ -139,6 +143,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitEvalExpr(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                    EvalExprNode exprEval, Stack<LogicalNode> stack) throws TajoException {
     PlanProto.EvalExprNode.Builder exprEvalBuilder = PlanProto.EvalExprNode.newBuilder();
@@ -152,6 +157,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return exprEval;
   }
 
+  @Override
   public LogicalNode visitProjection(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                      ProjectionNode projection, Stack<LogicalNode> stack) throws TajoException {
     super.visitProjection(context, plan, block, projection, stack);
@@ -189,6 +195,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return limit;
   }
 
+  @Override
   public LogicalNode visitWindowAgg(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                     WindowAggNode windowAgg, Stack<LogicalNode> stack) throws TajoException {
     super.visitWindowAgg(context, plan, block, windowAgg, stack);
@@ -263,6 +270,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return having;
   }
 
+  @Override
   public LogicalNode visitGroupBy(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                   GroupbyNode node, Stack<LogicalNode> stack) throws TajoException {
     super.visitGroupBy(context, plan, block, node, new Stack<LogicalNode>());
@@ -298,6 +306,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return nodeBuilder;
   }
 
+  @Override
   public LogicalNode visitDistinctGroupby(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                           DistinctGroupbyNode node, Stack<LogicalNode> stack) throws TajoException {
     super.visitDistinctGroupby(context, plan, block, node, new Stack<LogicalNode>());
@@ -354,6 +363,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return filter;
   }
 
+  @Override
   public LogicalNode visitJoin(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block, JoinNode join,
                           Stack<LogicalNode> stack) throws TajoException {
     super.visitJoin(context, plan, block, join, stack);
@@ -438,6 +448,26 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
   }
 
   @Override
+  public LogicalNode visitIndexScan(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    IndexScanNode node, Stack<LogicalNode> stack) throws TajoException {
+
+    PlanProto.ScanNode.Builder scanBuilder = buildScanNode(node);
+
+    PlanProto.IndexScanSpec.Builder indexScanSpecBuilder = PlanProto.IndexScanSpec.newBuilder();
+    indexScanSpecBuilder.setKeySchema(node.getKeySchema().getProto());
+    indexScanSpecBuilder.setIndexPath(node.getIndexPath().toString());
+    for (SimplePredicate predicate : node.getPredicates()) {
+      indexScanSpecBuilder.addPredicates(predicate.getProto());
+    }
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node);
+    nodeBuilder.setScan(scanBuilder);
+    nodeBuilder.setIndexScan(indexScanSpecBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+    return node;
+  }
+
+  @Override
   public LogicalNode visitPartitionedTableScan(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                           PartitionedTableScanNode node, Stack<LogicalNode> stack)
       throws TajoException {
@@ -461,6 +491,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitTableSubQuery(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                    TableSubQueryNode node, Stack<LogicalNode> stack) throws TajoException {
     super.visitTableSubQuery(context, plan, block, node, stack);
@@ -483,6 +514,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitCreateTable(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                       CreateTableNode node, Stack<LogicalNode> stack) throws TajoException {
     super.visitCreateTable(context, plan, block, node, stack);
@@ -623,6 +655,7 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
   public LogicalNode visitInsert(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  InsertNode node, Stack<LogicalNode> stack) throws TajoException {
     super.visitInsert(context, plan, block, node, stack);
@@ -711,6 +744,48 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     return node;
   }
 
+  @Override
+  public LogicalNode visitCreateIndex(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                      CreateIndexNode node, Stack<LogicalNode> stack) throws TajoException {
+    super.visitCreateIndex(context, plan, block, node, new Stack<LogicalNode>());
+
+    PlanProto.CreateIndexNode.Builder createIndexBuilder = PlanProto.CreateIndexNode.newBuilder();
+    int [] childIds = registerGetChildIds(context, node);
+    createIndexBuilder.setChildSeq(childIds[0]);
+    createIndexBuilder.setIndexName(node.getIndexName());
+    createIndexBuilder.setIndexMethod(node.getIndexMethod());
+    createIndexBuilder.setIndexPath(node.getIndexPath().toString());
+    for (SortSpec sortSpec : node.getKeySortSpecs()) {
+      createIndexBuilder.addKeySortSpecs(sortSpec.getProto());
+    }
+    createIndexBuilder.setTargetRelationSchema(node.getTargetRelationSchema().getProto());
+    createIndexBuilder.setIsUnique(node.isUnique());
+    createIndexBuilder.setIsClustered(node.isClustered());
+    if (node.hasOptions()) {
+      createIndexBuilder.setIndexProperties(node.getOptions().getProto());
+    }
+    createIndexBuilder.setIsExternal(node.isExternal());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node);
+    nodeBuilder.setCreateIndex(createIndexBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
+  @Override
+  public LogicalNode visitDropIndex(SerializeContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    DropIndexNode node, Stack<LogicalNode> stack) {
+    PlanProto.DropIndexNode.Builder dropIndexBuilder = PlanProto.DropIndexNode.newBuilder();
+    dropIndexBuilder.setIndexName(node.getIndexName());
+
+    PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node);
+    nodeBuilder.setDropIndex(dropIndexBuilder);
+    context.treeBuilder.addNodes(nodeBuilder);
+
+    return node;
+  }
+
   public static PlanProto.NodeType convertType(NodeType type) {
     return PlanProto.NodeType.valueOf(type.name());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
new file mode 100644
index 0000000..73b33d5
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/IndexUtil.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.util;
+
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.plan.expr.*;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+
+public class IndexUtil {
+  
+  public static String getIndexName(String indexName , SortSpec[] keys) {
+    StringBuilder builder = new StringBuilder();
+    builder.append(indexName + "_");
+    for(int i = 0 ; i < keys.length ; i ++) {
+      builder.append(keys[i].getSortKey().getSimpleName() + "_");
+    }
+    return builder.toString();
+  }
+
+  public static List<EvalNode> getAllEqualEvals(EvalNode qual) {
+    EvalTreeUtil.EvalFinder finder = new EvalTreeUtil.EvalFinder(EvalType.EQUAL);
+    finder.visitChild(null, qual, new Stack<EvalNode>());
+    return finder.getEvalNodes();
+  }
+  
+  private static class FieldAndValueFinder implements EvalNodeVisitor {
+    private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>();
+    
+    public LinkedList<BinaryEval> getNodeList () {
+      return this.nodeList;
+    }
+    
+    @Override
+    public void visit(EvalNode node) {
+      BinaryEval binaryEval = (BinaryEval) node;
+      switch(node.getType()) {
+      case AND:
+        break;
+      case EQUAL:
+        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
+          && binaryEval.getRightExpr().getType() == EvalType.CONST ) {
+          nodeList.add(binaryEval);
+        }
+        break;
+      case IS_NULL:
+        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
+          && binaryEval.getRightExpr().getType() == EvalType.CONST) {
+          nodeList.add(binaryEval);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 9c34826..c4a4367 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -69,10 +69,31 @@ public class PlannerUtil {
         type == NodeType.CREATE_DATABASE ||
             type == NodeType.DROP_DATABASE ||
             (type == NodeType.CREATE_TABLE && !((CreateTableNode) baseNode).hasSubQuery()) ||
-            baseNode.getType() == NodeType.DROP_TABLE ||
-            baseNode.getType() == NodeType.ALTER_TABLESPACE ||
-            baseNode.getType() == NodeType.ALTER_TABLE ||
-            baseNode.getType() == NodeType.TRUNCATE_TABLE;
+            type == NodeType.DROP_TABLE ||
+            type == NodeType.ALTER_TABLESPACE ||
+            type == NodeType.ALTER_TABLE ||
+            type == NodeType.TRUNCATE_TABLE ||
+            type == NodeType.CREATE_INDEX ||
+            type == NodeType.DROP_INDEX;
+  }
+
+  /**
+   * Most update queries require only the updates to the catalog information,
+   * but some queries such as "CREATE INDEX" or CTAS requires distributed execution on multiple cluster nodes.
+   * This function checks whether the given DDL plan requires distributed execution or not.
+   * @param node the root node of a query plan
+   * @return Return true if the input query plan requires distributed execution. Otherwise, return false.
+   */
+  public static boolean isDistExecDDL(LogicalNode node) {
+    LogicalNode baseNode = node;
+    if (node instanceof LogicalRootNode) {
+      baseNode = ((LogicalRootNode) node).getChild();
+    }
+
+    NodeType type = baseNode.getType();
+
+    return type == NodeType.CREATE_INDEX && !((CreateIndexNode)baseNode).isExternal() ||
+        type == NodeType.CREATE_TABLE && ((CreateTableNode)baseNode).hasSubQuery();
   }
 
   /**
@@ -386,6 +407,12 @@ public class PlannerUtil {
         QueryBlock block, PartitionedTableScanNode node, Stack<LogicalNode> stack) {
       return node;
     }
+
+    @Override
+    public LogicalNode visitIndexScan(ReplacerContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                      IndexScanNode node, Stack<LogicalNode> stack) throws TajoException {
+      return node;
+    }
   }
 
   public static void replaceNode(LogicalNode plan, LogicalNode newNode, NodeType type) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
index ebb5c80..2c8f344 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/BasicLogicalPlanVisitor.java
@@ -21,7 +21,6 @@ package org.apache.tajo.plan.visitor;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.*;
 
 import java.util.Stack;
@@ -113,6 +112,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
       case PARTITIONS_SCAN:
         current = visitPartitionedTableScan(context, plan, block, (PartitionedTableScanNode) node, stack);
         break;
+      case INDEX_SCAN:
+        current = visitIndexScan(context, plan, block, (IndexScanNode) node, stack);
+        break;
       case STORE:
         current = visitStoreTable(context, plan, block, (StoreTableNode) node, stack);
         break;
@@ -137,9 +139,15 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
       case ALTER_TABLE:
         current = visitAlterTable(context, plan, block, (AlterTableNode) node, stack);
         break;
+      case CREATE_INDEX:
+        current = visitCreateIndex(context, plan, block, (CreateIndexNode) node, stack);
+        break;
       case TRUNCATE_TABLE:
         current = visitTruncateTable(context, plan, block, (TruncateTableNode) node, stack);
         break;
+      case DROP_INDEX:
+        current = visitDropIndex(context, plan, block, (DropIndexNode) node, stack);
+        break;
       default:
         throw new TajoInternalError("Unknown logical node type: " + node.getType());
     }
@@ -312,6 +320,12 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitIndexScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node,
+                               Stack<LogicalNode> stack) throws TajoException {
+    return null;
+  }
+
+  @Override
   public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                           PartitionedTableScanNode node, Stack<LogicalNode> stack)
       throws TajoException {
@@ -379,6 +393,22 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
     }
 
   @Override
+  public RESULT visitCreateIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateIndexNode node,
+                                 Stack<LogicalNode> stack) throws TajoException {
+    RESULT result = null;
+    stack.push(node);
+    result = visit(context, plan, block, node.getChild(), stack);
+    stack.pop();
+    return result;
+  }
+
+  @Override
+  public RESULT visitDropIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropIndexNode node,
+                               Stack<LogicalNode> stack) {
+    return null;
+  }
+
+  @Override
   public RESULT visitTruncateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                    TruncateTableNode node, Stack<LogicalNode> stack) throws TajoException {
     return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
index 925742a..f826479 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/ExplainLogicalPlanVisitor.java
@@ -22,7 +22,6 @@ import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanString;
-import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.*;
 
 import java.util.Stack;
@@ -235,6 +234,19 @@ public class ExplainLogicalPlanVisitor extends BasicLogicalPlanVisitor<ExplainLo
     return node;
   }
 
+  @Override
+  public LogicalNode visitCreateIndex(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                     CreateIndexNode node, Stack<LogicalNode> stack) throws TajoException {
+    return visitUnaryNode(context, plan, block, node, stack);
+  }
+
+  @Override
+  public LogicalNode visitDropIndex(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                    DropIndexNode node, Stack<LogicalNode> stack) {
+    context.add(context.depth, node.getPlanString());
+    return node;
+  }
+
   public static String printDepthString(int maxDepth, DepthString planStr) {
     StringBuilder output = new StringBuilder();
     String pad = new String(new char[planStr.getDepth() * 3]).replace('\0', ' ');

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
index dd832ae..9e9389d 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/LogicalPlanVisitor.java
@@ -20,7 +20,6 @@ package org.apache.tajo.plan.visitor;
 
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.*;
 
 import java.util.Stack;
@@ -76,6 +75,9 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, ScanNode node,
                    Stack<LogicalNode> stack) throws TajoException;
 
+  RESULT visitIndexScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, IndexScanNode node,
+                        Stack<LogicalNode> stack) throws TajoException;
+
   RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                    PartitionedTableScanNode node, Stack<LogicalNode> stack) throws TajoException;
 
@@ -103,6 +105,12 @@ public interface LogicalPlanVisitor<CONTEXT, RESULT> {
   RESULT visitAlterTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, AlterTableNode node,
                          Stack<LogicalNode> stack) throws TajoException;
 
+  RESULT visitCreateIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, CreateIndexNode node,
+                               Stack<LogicalNode> stack) throws TajoException;
+
+  RESULT visitDropIndex(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, DropIndexNode node,
+                        Stack<LogicalNode> stack) throws TajoException;
+
   RESULT visitTruncateTable(CONTEXT context, LogicalPlan plan, LogicalPlan.QueryBlock block, TruncateTableNode node,
                          Stack<LogicalNode> stack) throws TajoException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
index 989af88..4854d7f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/visitor/SimpleAlgebraVisitor.java
@@ -20,7 +20,6 @@ package org.apache.tajo.plan.visitor;
 
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.algebra.BaseAlgebraVisitor;
 
 import java.util.Stack;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 537d180..4019322 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -46,7 +46,7 @@ enum NodeType {
   TABLE_SUBQUERY = 15;
   SCAN = 16;
   PARTITIONS_SCAN = 17;
-  BST_INDEX_SCAN = 18;
+  INDEX_SCAN = 18;
   STORE = 19;
   INSERT = 20;
 
@@ -57,6 +57,8 @@ enum NodeType {
   ALTER_TABLESPACE = 25;
   ALTER_TABLE = 26;
   TRUNCATE_TABLE = 27;
+  CREATE_INDEX = 28;
+  DROP_INDEX = 29;
 }
 
 message LogicalNodeTree {
@@ -72,31 +74,35 @@ message LogicalNode {
 
   optional ScanNode scan = 6;
   optional PartitionScanSpec partitionScan = 7;
-  optional JoinNode join = 8;
-  optional FilterNode filter = 9;
-  optional GroupbyNode groupby = 10;
-  optional DistinctGroupbyNode distinctGroupby = 11;
-  optional SortNode sort = 12;
-  optional LimitNode limit = 13;
-  optional WindowAggNode windowAgg = 14;
-  optional ProjectionNode projection = 15;
-  optional EvalExprNode exprEval = 16;
-  optional UnionNode union = 17;
-  optional TableSubQueryNode tableSubQuery = 18;
-  optional PersistentStoreNode persistentStore = 19;
-  optional StoreTableNodeSpec storeTable = 20;
-  optional InsertNodeSpec insert = 21;
-  optional CreateTableNodeSpec createTable = 22;
-  optional RootNode root = 23;
-  optional SetSessionNode setSession = 24;
-
-  optional CreateDatabaseNode createDatabase = 25;
-  optional DropDatabaseNode dropDatabase = 26;
-  optional DropTableNode dropTable = 27;
-
-  optional AlterTablespaceNode alterTablespace = 28;
-  optional AlterTableNode alterTable = 29;
-  optional TruncateTableNode truncateTableNode = 30;
+  optional IndexScanSpec indexScan = 8;
+  optional JoinNode join = 9;
+  optional FilterNode filter = 10;
+  optional GroupbyNode groupby = 11;
+  optional DistinctGroupbyNode distinctGroupby = 12;
+  optional SortNode sort = 13;
+  optional LimitNode limit = 14;
+  optional WindowAggNode windowAgg = 15;
+  optional ProjectionNode projection = 16;
+  optional EvalExprNode exprEval = 17;
+  optional UnionNode union = 18;
+  optional TableSubQueryNode tableSubQuery = 19;
+  optional PersistentStoreNode persistentStore = 20;
+  optional StoreTableNodeSpec storeTable = 21;
+  optional InsertNodeSpec insert = 22;
+  optional CreateTableNodeSpec createTable = 23;
+  optional RootNode root = 24;
+  optional SetSessionNode setSession = 25;
+
+  optional CreateDatabaseNode createDatabase = 26;
+  optional DropDatabaseNode dropDatabase = 27;
+  optional DropTableNode dropTable = 28;
+
+  optional AlterTablespaceNode alterTablespace = 29;
+  optional AlterTableNode alterTable = 30;
+  optional TruncateTableNode truncateTableNode = 31;
+
+  optional CreateIndexNode createIndex = 32;
+  optional DropIndexNode dropIndex = 33;
 }
 
 message ScanNode {
@@ -112,6 +118,17 @@ message PartitionScanSpec {
   repeated string paths = 1;
 }
 
+message IndexScanSpec {
+  required SchemaProto keySchema = 1;
+  required string indexPath = 2;
+  repeated SimplePredicateProto predicates = 3;
+}
+
+message SimplePredicateProto {
+  required SortSpecProto keySortSpec = 1;
+  required Datum value = 2;
+}
+
 message FilterNode {
   required int32 childSeq = 1;
   required EvalNodeTree qual = 2;
@@ -314,6 +331,23 @@ message AlterTableNode {
   optional AlterPartition alterPartition = 7;
 }
 
+message CreateIndexNode {
+  required int32 childSeq = 1;
+  required string indexName = 2;
+  required IndexMethod indexMethod = 3;
+  required string indexPath = 4;
+  repeated SortSpecProto keySortSpecs = 5;
+  required SchemaProto targetRelationSchema = 6;
+  optional bool isUnique = 7 [default = false];
+  optional bool isClustered = 8 [default = false];
+  optional KeyValueSetProto indexProperties = 9;
+  optional bool isExternal = 10;
+}
+
+message DropIndexNode {
+  required string indexName = 1;
+}
+
 enum EvalType {
   NOT = 0;
   AND = 1;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
index ef33a8e..d12c6bd 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
@@ -174,22 +174,6 @@ public class OldStorageManager {
   }
 
   /**
-   * Returns Scanner instance.
-   *
-   * @param conf The system property
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target The output schema
-   * @return Scanner instance
-   * @throws IOException
-   */
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
-    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
-  }
-
-  /**
    * Creates a scanner instance.
    *
    * @param theClass Concrete class of scanner

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index f8883b0..4d7fac4 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -268,6 +268,21 @@ public abstract class Tablespace {
   }
 
   /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws IOException
+   */
+  public synchronized SeekableScanner getSeekableScanner(TableMeta meta, Schema schema, FragmentProto fragment,
+                                                         Schema target) throws IOException {
+    return (SeekableScanner)this.getScanner(meta, schema, fragment, target);
+  }
+
+  /**
    * Returns Appender instance.
    * @param queryContext Query property.
    * @param taskAttemptId Task id.

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 00cac77..3060d53 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -49,6 +49,7 @@ import org.apache.tajo.plan.logical.CreateTableNode;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.*;
@@ -1091,8 +1092,8 @@ public class HBaseTablespace extends Tablespace {
 
   @Override
   public void rewritePlan(OverridableConf context, LogicalPlan plan) throws TajoException {
-    if (REWRITE_RULE.isEligible(context, plan)) {
-      REWRITE_RULE.rewrite(context, plan);
+    if (REWRITE_RULE.isEligible(new LogicalPlanRewriteRuleContext(context, plan))) {
+      REWRITE_RULE.rewrite(new LogicalPlanRewriteRuleContext(context, plan));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
index db9f3c8..40789ac 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.storage.hbase;
 
-import org.apache.tajo.OverridableConf;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -28,6 +27,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.logical.SortNode.SortPurpose;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 import org.apache.tajo.util.KeyValueSet;
 
 import java.io.IOException;
@@ -46,9 +46,9 @@ public class SortedInsertRewriter implements LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    boolean hbaseMode = "false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
-    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
+    boolean hbaseMode = "false".equalsIgnoreCase(context.getQueryContext().get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
+    LogicalRootNode rootNode = context.getPlan().getRootBlock().getRoot();
     LogicalNode node = rootNode.getChild();
     return hbaseMode && node.getType() == NodeType.CREATE_TABLE || node.getType() == NodeType.INSERT;
   }
@@ -69,7 +69,8 @@ public class SortedInsertRewriter implements LogicalPlanRewriteRule {
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException {
+    LogicalPlan plan = context.getPlan();
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
     StoreTableNode storeTable = rootNode.getChild();

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index ed010c0..5437b0d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -124,7 +124,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -147,7 +148,8 @@ public class TestBSTIndex {
     tuple = new VTuple(keySchema.size());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
     reader.open();
-    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -226,7 +228,8 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -289,7 +292,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -361,7 +365,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -384,7 +389,8 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple result;
@@ -452,7 +458,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -475,7 +482,8 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple result;
@@ -532,7 +540,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -557,7 +566,8 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     tuple.put(0, DatumFactory.createInt8(0));
@@ -616,7 +626,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -722,7 +733,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -804,7 +816,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -829,7 +842,8 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     for (int i = (TUPLE_NUM - 1); i > 0; i--) {
@@ -894,7 +908,8 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -922,7 +937,8 @@ public class TestBSTIndex {
     assertEquals(keySchema, reader.getKeySchema());
     assertEquals(comp, reader.getComparator());
 
-    scanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = OldStorageManager.getStorageManager(conf, meta.getStoreType()).
+        getSeekableScanner(meta, schema, tablet.getProto(), schema);
     scanner.init();
 
     Tuple result;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index c198965..a001492 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -111,7 +111,8 @@ public class TestSingleCSVFileBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner fileScanner = OldStorageManager.getStorageManager(conf, meta.getStoreType())
+        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
     fileScanner.init();
     Tuple keyTuple;
     long offset;
@@ -135,7 +136,8 @@ public class TestSingleCSVFileBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir,
         "FindValueInCSV.idx"), keySchema, comp);
     reader.open();
-    fileScanner = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    fileScanner = OldStorageManager.getStorageManager(conf, meta.getStoreType())
+        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
     fileScanner.init();
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
       tuple.put(0, DatumFactory.createInt8(i));
@@ -200,7 +202,8 @@ public class TestSingleCSVFileBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
     
-    SeekableScanner fileScanner  = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner fileScanner  = OldStorageManager.getStorageManager(conf, meta.getStoreType())
+        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
     fileScanner.init();
     Tuple keyTuple;
     long offset;
@@ -221,7 +224,8 @@ public class TestSingleCSVFileBSTIndex {
     
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
     reader.open();
-    fileScanner  = OldStorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
+    fileScanner  = OldStorageManager.getStorageManager(conf, meta.getStoreType())
+        .getSeekableScanner(meta, schema, tablet.getProto(), schema);
     fileScanner.init();
     Tuple result;
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {


Mime
View raw message