tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [3/5] tajo git commit: TAJO-1300: Merge the index branch into the master branch.
Date Thu, 30 Jul 2015 04:39:01 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index d9712c9..67f782a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -1362,6 +1362,15 @@ public class GlobalPlanner {
     }
 
     @Override
+    public LogicalNode visitIndexScan(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+                                      IndexScanNode node, Stack<LogicalNode> stack) throws TajoException {
+      ExecutionBlock newBlock = context.plan.newExecutionBlock();
+      newBlock.setPlan(node);
+      context.execBlockMap.put(node.getPID(), newBlock);
+      return node;
+    }
+
+    @Override
     public LogicalNode visitPartitionedTableScan(GlobalPlanContext context, LogicalPlan plan,
                                                  LogicalPlan.QueryBlock block, PartitionedTableScanNode node,
                                                  Stack<LogicalNode> stack)throws TajoException {
@@ -1407,5 +1416,19 @@ public class GlobalPlanner {
 
       return node;
     }
+
+    @Override
+    public LogicalNode visitCreateIndex(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock,
+                                        CreateIndexNode node, Stack<LogicalNode> stack) throws TajoException {
+      LogicalNode child = super.visitCreateIndex(context, plan, queryBlock, node, stack);
+
+      // Don't separate execution block. CreateIndex is pushed to the first execution block.
+      ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+      node.setChild(childBlock.getPlan());
+      childBlock.setPlan(node);
+      context.execBlockMap.put(node.getPID(), childBlock);
+
+      return node;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 28622d7..a59960f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -18,23 +18,32 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.plan.Target;
 import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
 
 public class BSTIndexScanExec extends PhysicalExec {
-  private ScanNode scanNode;
+  private IndexScanNode plan;
   private SeekableScanner fileScanner;
   
   private EvalNode qual;
@@ -48,31 +57,122 @@ public class BSTIndexScanExec extends PhysicalExec {
 
   private Tuple indexLookupKey;
 
-  public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode ,
-       FileFragment fragment, Path fileName , Schema keySchema,
-       TupleComparator comparator , Datum[] datum) throws IOException {
-    super(context, scanNode.getInSchema(), scanNode.getOutSchema());
-    this.scanNode = scanNode;
-    this.qual = scanNode.getQual();
-    indexLookupKey = new VTuple(datum);
-
-    this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(),
-        scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
-    this.fileScanner.init();
-    this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
-
-    FileSystem fs = fileName.getFileSystem(context.getConf());
-    this.reader = new BSTIndex(fs.getConf()).
-        getIndexReader(fileName, keySchema, comparator);
+  private TableStats inputStats;
+
+  private CatalogProtos.FragmentProto fragment;
+
+  private Schema keySchema;
+
+  public BSTIndexScanExec(TaskAttemptContext context, IndexScanNode plan,
+                          CatalogProtos.FragmentProto fragment, URI indexPrefix , Schema keySchema,
+                          SimplePredicate [] predicates) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema());
+    this.plan = plan;
+    this.qual = plan.getQual();
+    this.fragment = fragment;
+    this.keySchema = keySchema;
+
+    SortSpec[] keySortSpecs = new SortSpec[predicates.length];
+    Datum[] values = new Datum[predicates.length];
+    for (int i = 0; i < predicates.length; i++) {
+      keySortSpecs[i] = predicates[i].getKeySortSpec();
+      values[i] = predicates[i].getValue();
+    }
+    indexLookupKey = new VTuple(values);
+
+    TupleComparator comparator = new BaseTupleComparator(keySchema,
+        keySortSpecs);
+
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+
+    Path indexPath = new Path(indexPrefix.toString(), context.getUniqueKeyFromFragments());
+    this.reader = new BSTIndex(context.getConf()).
+        getIndexReader(indexPath, keySchema, comparator);
     this.reader.open();
   }
 
+  private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) {
+    Schema mergedSchema = new Schema();
+    Set<Column> qualAndTargets = TUtil.newHashSet();
+    qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual));
+    for (Target target : targets) {
+      qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
+    }
+    for (Column column : originalSchema.getRootColumns()) {
+      if (subSchema.contains(column)
+          || qualAndTargets.contains(column)
+          || qualAndTargets.contains(column)) {
+        mergedSchema.addColumn(column);
+      }
+    }
+    return mergedSchema;
+  }
+
   @Override
   public void init() throws IOException {
+    Schema projected;
+
+    // in the case where projected column or expression are given
+    // the target can be an empty list.
+    if (plan.hasTargets()) {
+      projected = new Schema();
+      Set<Column> columnSet = new HashSet<Column>();
+
+      if (plan.hasQual()) {
+        columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
+      }
+
+      for (Target t : plan.getTargets()) {
+        columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
+      }
+
+      for (Column column : inSchema.getAllColumns()) {
+        if (columnSet.contains(column)) {
+          projected.addColumn(column);
+        }
+      }
+
+    } else {
+      // no any projected columns, meaning that all columns should be projected.
+      // TODO - this implicit rule makes code readability bad. So, we should remove it later
+      projected = outSchema;
+    }
+
+    initScanner(projected);
     super.init();
     progress = 0.0f;
-    if (qual != null) {
-      qual.bind(context.getEvalContext(), inSchema);
+
+    if (plan.hasQual()) {
+      if (fileScanner.isProjectable()) {
+        qual.bind(context.getEvalContext(), projected);
+      } else {
+        qual.bind(context.getEvalContext(), inSchema);
+      }
+    }
+  }
+
+  private void initScanner(Schema projected) throws IOException {
+
+    // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422
+    if (fragment != null) {
+
+      Schema fileScanOutSchema = mergeSubSchemas(projected, keySchema, plan.getTargets(), qual);
+
+      this.fileScanner = OldStorageManager.getStorageManager(context.getConf(),
+          plan.getTableDesc().getMeta().getStoreType())
+          .getSeekableScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragment, fileScanOutSchema);
+      this.fileScanner.init();
+
+      // See Scanner.isProjectable() method Depending on the result of isProjectable(),
+      // the width of retrieved tuple is changed.
+      //
+      // If TRUE, the retrieved tuple will contain only projected fields.
+      // If FALSE, the retrieved tuple will contain projected fields and NullDatum for non-projected fields.
+      if (fileScanner.isProjectable()) {
+        this.projector = new Projector(context, projected, outSchema, plan.getTargets());
+      } else {
+        this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+      }
     }
   }
 
@@ -102,8 +202,9 @@ public class BSTIndexScanExec extends PhysicalExec {
       fileScanner.seek(offset);
       }
     }
+
     Tuple tuple;
-    if (!scanNode.hasQual()) {
+    if (!plan.hasQual()) {
       if ((tuple = fileScanner.next()) != null) {
         return projector.eval(tuple);
       } else {
@@ -115,8 +216,11 @@ public class BSTIndexScanExec extends PhysicalExec {
            return projector.eval(tuple);
          } else {
            long offset = reader.next();
-           if (offset == -1) return null;
+           if (offset == -1) {
+             return null;
+           }
            else fileScanner.seek(offset);
+           return null;
          }
        }
      }
@@ -131,9 +235,19 @@ public class BSTIndexScanExec extends PhysicalExec {
   @Override
   public void close() throws IOException {
     IOUtils.cleanup(null, reader, fileScanner);
+    if (fileScanner != null) {
+      try {
+        TableStats stats = fileScanner.getInputStats();
+        if (stats != null) {
+          inputStats = (TableStats) stats.clone();
+        }
+      } catch (CloneNotSupportedException e) {
+        e.printStackTrace();
+      }
+    }
     reader = null;
     fileScanner = null;
-    scanNode = null;
+    plan = null;
     qual = null;
     projector = null;
     indexLookupKey = null;
@@ -143,4 +257,13 @@ public class BSTIndexScanExec extends PhysicalExec {
   public float getProgress() {
     return progress;
   }
+
+  @Override
+  public TableStats getInputStats() {
+    if (fileScanner != null) {
+      return fileScanner.getInputStats();
+    } else {
+      return inputStats;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index 8a79005..488c3ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -54,7 +54,9 @@ public class ProjectionExec extends UnaryPhysicalExec {
       return null;
     }
 
-    return projector.eval(tuple);
+    Tuple outTuple = projector.eval(tuple);
+    outTuple.setOffset(tuple.getOffset());
+    return outTuple;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
new file mode 100644
index 0000000..f9db842
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.logical.CreateIndexNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class StoreIndexExec extends UnaryPhysicalExec {
+  private static final Log LOG = LogFactory.getLog(StoreIndexExec.class);
+  private BSTIndexWriter indexWriter;
+  private final CreateIndexNode logicalPlan;
+  private int[] indexKeys = null;
+  private Schema keySchema;
+  private TupleComparator comparator;
+
+  public StoreIndexExec(final TaskAttemptContext context, final CreateIndexNode logicalPlan,
+                        final PhysicalExec child) {
+    super(context, logicalPlan.getInSchema(), logicalPlan.getOutSchema(), child);
+    this.logicalPlan = logicalPlan;
+  }
+
+  @Override
+  public void init() throws IOException {
+    super.init();
+
+    SortSpec[] sortSpecs = logicalPlan.getKeySortSpecs();
+    indexKeys = new int[sortSpecs.length];
+    keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
+
+    Column col;
+    for (int i = 0 ; i < sortSpecs.length; i++) {
+      col = sortSpecs[i].getSortKey();
+      indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
+    }
+
+    TajoConf conf = context.getConf();
+    Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments());
+    // TODO: Create factory using reflection
+    BSTIndex bst = new BSTIndex(conf);
+    this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
+    this.indexWriter = bst.getIndexWriter(indexPath, BSTIndex.TWO_LEVEL_INDEX, keySchema, comparator);
+    this.indexWriter.setLoadNum(100);
+    this.indexWriter.open();
+  }
+
+  @Override
+  public Tuple next() throws IOException {
+    Tuple tuple;
+    Tuple keyTuple;
+    long offset;
+
+    while((tuple = child.next()) != null) {
+      offset = tuple.getOffset();
+      keyTuple = new VTuple(keySchema.size());
+      RowStoreUtil.project(tuple, keyTuple, indexKeys);
+      indexWriter.write(keyTuple, offset);
+    }
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+
+    indexWriter.flush();
+    IOUtils.cleanup(LOG, indexWriter);
+
+    indexWriter = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
index 6582513..54b6c5e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
@@ -18,11 +18,10 @@
 
 package org.apache.tajo.engine.utils.test;
 
-import org.apache.tajo.OverridableConf;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
 
 @SuppressWarnings("unused")
 public class ErrorInjectionRewriter implements LogicalPlanRewriteRule {
@@ -32,12 +31,12 @@ public class ErrorInjectionRewriter implements LogicalPlanRewriteRule {
   }
 
   @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
+  public boolean isEligible(LogicalPlanRewriteRuleContext context) {
     return true;
   }
 
   @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException {
+  public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException {
     throw new NullPointerException();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 9cd20b4..2ad45ba 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -96,7 +96,8 @@ public class GlobalEngine extends AbstractService {
       analyzer = new SQLAnalyzer();
       preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
       planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance());
-      optimizer = new LogicalOptimizer(context.getConf());
+      // Access path rewriter is enabled only in QueryMasterTask
+      optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog());
       annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
     } catch (Throwable t) {
       LOG.error(t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 78fc0f5..a597d32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -36,8 +36,7 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FunctionListResponse;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableResponse;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryContext;
@@ -65,7 +64,10 @@ import org.apache.tajo.util.ProtoUtil;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.exception.ExceptionUtil.printStackTraceIfError;
@@ -311,7 +313,6 @@ public class TajoMasterClientService extends AbstractService {
             .setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto())
             .setUserName(context.getConf().getVar(ConfVars.USERNAME))
             .build();
-
       }
     }
 
@@ -959,5 +960,200 @@ public class TajoMasterClientService extends AbstractService {
             .build();
       }
     }
+
+    @Override
+    public IndexResponse getIndexWithName(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String indexName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          indexName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          indexName = request.getValue();
+        }
+        IndexDescProto indexProto = catalog.getIndexByName(databaseName, indexName).getProto();
+        return IndexResponse.newBuilder()
+            .setState(OK)
+            .setIndexDesc(indexProto)
+            .build();
+
+      } catch (Throwable t) {
+        return IndexResponse.newBuilder()
+            .setState(returnError(t))
+            .build();
+      }
+    }
+
+    @Override
+    public ReturnState existIndexWithName(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String indexName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          indexName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          indexName = request.getValue();
+        }
+
+        if (catalog.existIndexByName(databaseName, indexName)) {
+          return OK;
+        } else {
+          return errUndefinedIndexName(indexName);
+        }
+      } catch (Throwable t) {
+        return returnError(t);
+      }
+    }
+
+    @Override
+    public IndexListResponse getIndexesForTable(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getValue();
+        }
+
+        IndexListResponse.Builder builder = IndexListResponse.newBuilder().setState(OK);
+        for (IndexDesc index : catalog.getAllIndexesByTable(databaseName, tableName)) {
+          builder.addIndexDesc(index.getProto());
+        }
+        return builder.build();
+      } catch (Throwable t) {
+        return IndexListResponse.newBuilder()
+            .setState(returnError(t))
+            .build();
+      }
+    }
+
+    @Override
+    public ReturnState existIndexesForTable(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getValue();
+        }
+        if (catalog.existIndexesByTable(databaseName, tableName)) {
+          return OK;
+        } else {
+          return errUndefinedIndex(tableName);
+        }
+      } catch (Throwable t) {
+        return returnError(t);
+      }
+    }
+
+    @Override
+    public IndexResponse getIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getTableName())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getTableName();
+        }
+        String[] columnNames = new String[request.getColumnNamesCount()];
+        columnNames = request.getColumnNamesList().toArray(columnNames);
+
+        return IndexResponse.newBuilder()
+            .setState(OK)
+            .setIndexDesc(catalog.getIndexByColumnNames(databaseName, tableName, columnNames).getProto())
+            .build();
+
+      } catch (Throwable t) {
+        return IndexResponse.newBuilder()
+            .setState(returnError(t))
+            .build();
+      }
+    }
+
+    @Override
+    public ReturnState existIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        String tableName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getTableName())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+          databaseName = splitted[0];
+          tableName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          tableName = request.getTableName();
+        }
+        String[] columnNames = new String[request.getColumnNamesCount()];
+        columnNames = request.getColumnNamesList().toArray(columnNames);
+        if (catalog.existIndexByColumnNames(databaseName, tableName, columnNames)) {
+          return OK;
+        } else {
+          return errUndefinedIndex(tableName, request.getColumnNamesList());
+        }
+      } catch (Throwable t) {
+        return returnError(t);
+      }
+    }
+
+    @Override
+    public ReturnState dropIndex(RpcController controller, SessionedStringProto request)
+        throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        QueryContext queryContext = new QueryContext(conf, session);
+
+        String indexName, databaseName;
+        if (CatalogUtil.isFQTableName(request.getValue())) {
+          String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+          databaseName = splitted[0];
+          indexName = splitted[1];
+        } else {
+          databaseName = session.getCurrentDatabase();
+          indexName = request.getValue();
+        }
+        catalog.dropIndex(databaseName, indexName);
+
+        return OK;
+      } catch (Throwable t) {
+        return returnError(t);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index f6bb4f7..a535f94 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -72,45 +72,123 @@ public class DDLExecutor {
 
     switch (root.getType()) {
 
-    case ALTER_TABLESPACE:
-      AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
-      alterTablespace(context, queryContext, alterTablespace);
-      return true;
+      case ALTER_TABLESPACE:
+        AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+        alterTablespace(context, queryContext, alterTablespace);
+        return true;
 
 
-    case CREATE_DATABASE:
-      CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
-      createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
-      return true;
-    case DROP_DATABASE:
-      DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
-      dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
-      return true;
+      case CREATE_DATABASE:
+        CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+        createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+        return true;
+      case DROP_DATABASE:
+        DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+        dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+        return true;
 
 
-    case CREATE_TABLE:
-      CreateTableNode createTable = (CreateTableNode) root;
-      createTable(queryContext, createTable, createTable.isIfNotExists());
-      return true;
-    case DROP_TABLE:
-      DropTableNode dropTable = (DropTableNode) root;
-      dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
-      return true;
-    case TRUNCATE_TABLE:
-      TruncateTableNode truncateTable = (TruncateTableNode) root;
-      truncateTable(queryContext, truncateTable);
-      return true;
+      case CREATE_TABLE:
+        CreateTableNode createTable = (CreateTableNode) root;
+        createTable(queryContext, createTable, createTable.isIfNotExists());
+        return true;
+      case DROP_TABLE:
+        DropTableNode dropTable = (DropTableNode) root;
+        dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+        return true;
+      case TRUNCATE_TABLE:
+        TruncateTableNode truncateTable = (TruncateTableNode) root;
+        truncateTable(queryContext, truncateTable);
+        return true;
 
-    case ALTER_TABLE:
-      AlterTableNode alterTable = (AlterTableNode) root;
-      alterTable(context, queryContext, alterTable);
-      return true;
+      case ALTER_TABLE:
+        AlterTableNode alterTable = (AlterTableNode) root;
+        alterTable(context, queryContext, alterTable);
+        return true;
+
+      case CREATE_INDEX:
+        CreateIndexNode createIndex = (CreateIndexNode) root;
+        createIndex(queryContext, createIndex);
+        return true;
+
+      case DROP_INDEX:
+        DropIndexNode dropIndexNode = (DropIndexNode) root;
+        dropIndex(queryContext, dropIndexNode);
+        return true;
 
     default:
       throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
     }
   }
 
+  public void createIndex(final QueryContext queryContext, final CreateIndexNode createIndexNode) {
+    String databaseName, simpleIndexName, qualifiedIndexName;
+    if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+      String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+      databaseName = splits[0];
+      simpleIndexName = splits[1];
+      qualifiedIndexName = createIndexNode.getIndexName();
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleIndexName = createIndexNode.getIndexName();
+      qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+    }
+
+    if (catalog.existIndexByName(databaseName, simpleIndexName)) {
+      throw new DuplicateIndexException(simpleIndexName);
+    }
+
+    ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
+    if (scanNode == null) {
+      throw new InternalError("Cannot find the table of the relation");
+    }
+
+    IndexDesc indexDesc = new IndexDesc(databaseName, CatalogUtil.extractSimpleName(scanNode.getTableName()),
+        simpleIndexName, createIndexNode.getIndexPath(),
+        createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
+        createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
+
+    if (catalog.createIndex(indexDesc)) {
+      LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
+    } else {
+      LOG.info("Index creation " + qualifiedIndexName + " is failed.");
+      throw new TajoInternalError("Cannot create index \"" + qualifiedIndexName + "\".");
+    }
+  }
+
+  public void dropIndex(final QueryContext queryContext, final DropIndexNode dropIndexNode) {
+    String databaseName, simpleIndexName;
+    if (CatalogUtil.isFQTableName(dropIndexNode.getIndexName())) {
+      String [] splits = CatalogUtil.splitFQTableName(dropIndexNode.getIndexName());
+      databaseName = splits[0];
+      simpleIndexName = splits[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleIndexName = dropIndexNode.getIndexName();
+    }
+
+    if (!catalog.existIndexByName(databaseName, simpleIndexName)) {
+      throw new UndefinedIndexException(simpleIndexName);
+    }
+
+    IndexDesc desc = catalog.getIndexByName(databaseName, simpleIndexName);
+
+    if (!catalog.dropIndex(databaseName, simpleIndexName)) {
+      LOG.info("Cannot drop index \"" + simpleIndexName + "\".");
+      throw new TajoInternalError("Cannot drop index \"" + simpleIndexName + "\".");
+    }
+
+    Path indexPath = new Path(desc.getIndexPath());
+    try {
+      FileSystem fs = indexPath.getFileSystem(context.getConf());
+      fs.delete(indexPath, true);
+    } catch (IOException e) {
+      throw new InternalError(e.getMessage());
+    }
+
+    LOG.info("Index " + simpleIndexName + " is dropped.");
+  }
+
   /**
    * Alter a given table
    */

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 228c0b8..b348265 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -25,7 +25,11 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.DataType;
@@ -296,43 +300,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
     
     return tuples;
   }
-  
+
   private List<Tuple> getIndexes(Schema outSchema) {
-    List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+    List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes();
     List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
     List<Column> columns = outSchema.getRootColumns();
     Tuple aTuple;
 
-    for (IndexProto index: indexList) {
+    for (IndexDescProto index: indexList) {
       aTuple = new VTuple(outSchema.size());
-      
+
       for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
         Column column = columns.get(fieldId);
-        
+
         if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId()));
         } else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+          aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid()));
         } else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
           aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
-        } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
-        } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
-        } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
-        } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
-        } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
-        } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
-          aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+        } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name()));
+        } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) {
+          aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath()));
         }
       }
-      
+
       tuples.add(aTuple);
     }
-    
+
     return tuples;
   }
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 42e5f61..e7fc4d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -25,10 +25,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.*;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.DuplicateIndexException;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
@@ -71,8 +69,8 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase;
 import static org.apache.tajo.exception.ReturnStateUtil.OK;
+import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase;
 
 public class QueryExecutor {
   private static final Log LOG = LogFactory.getLog(QueryExecutor.class);
@@ -105,10 +103,17 @@ public class QueryExecutor {
 
 
     } else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
-      ddlExecutor.execute(queryContext, plan);
 
-      response.setState(OK);
-      response.setResultType(ResultType.NO_RESULT);
+      if (PlannerUtil.isDistExecDDL(rootNode)) {
+        if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) {
+          checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild());
+        }
+        executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
+      } else {
+        ddlExecutor.execute(queryContext, plan);
+        response.setState(OK);
+        response.setResultType(ResultType.NO_RESULT);
+      }
 
     } else if (plan.isExplain()) { // explain query
       execExplain(session, sql, plan, queryContext, plan.isExplainGlobal(), response);
@@ -135,7 +140,7 @@ public class QueryExecutor {
 
   public void execSetSession(Session session, LogicalPlan plan,
                              SubmitQueryResponse.Builder response) {
-    SetSessionNode setSessionNode = ((LogicalRootNode)plan.getRootBlock().getRoot()).getChild();
+    SetSessionNode setSessionNode = ((LogicalRootNode) plan.getRootBlock().getRoot()).getChild();
 
     final String varName = setSessionNode.getName();
 
@@ -523,6 +528,25 @@ public class QueryExecutor {
         " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
   }
 
+  private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
+      throws IOException {
+    String databaseName, simpleIndexName, qualifiedIndexName;
+    if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+      String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+      databaseName = splits[0];
+      simpleIndexName = splits[1];
+      qualifiedIndexName = createIndexNode.getIndexName();
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleIndexName = createIndexNode.getIndexName();
+      qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+    }
+
+    if (catalog.existIndexByName(databaseName, simpleIndexName)) {
+      throw new DuplicateIndexException(qualifiedIndexName);
+    }
+  }
+
   public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
       throws Exception {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 6fc4ea9..e3629c7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -33,6 +33,8 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.TableDesc;
@@ -43,6 +45,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.ExecutionQueue;
 import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
@@ -513,6 +516,7 @@ public class Query implements EventHandler<QueryEvent> {
         hookList.add(new MaterializedResultHook());
         hookList.add(new CreateTableHook());
         hookList.add(new InsertTableHook());
+        hookList.add(new CreateIndexHook());
       }
 
       public void execute(QueryContext queryContext, Query query,
@@ -526,6 +530,48 @@ public class Query implements EventHandler<QueryEvent> {
       }
     }
 
+    private static class CreateIndexHook implements QueryHook {
+
+      @Override
+      public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+        Stage lastStage = query.getStage(finalExecBlockId);
+        return  lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX;
+      }
+
+      @Override
+      public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+        CatalogService catalog = context.getWorkerContext().getCatalog();
+        Stage lastStage = query.getStage(finalExecBlockId);
+
+        CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan();
+        String databaseName, simpleIndexName, qualifiedIndexName;
+        if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+          String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+          databaseName = splits[0];
+          simpleIndexName = splits[1];
+          qualifiedIndexName = createIndexNode.getIndexName();
+        } else {
+          databaseName = queryContext.getCurrentDatabase();
+          simpleIndexName = createIndexNode.getIndexName();
+          qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+        }
+        ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
+        if (scanNode == null) {
+          throw new IOException("Cannot find the table of the relation");
+        }
+        IndexDesc indexDesc = new IndexDesc(databaseName, CatalogUtil.extractSimpleName(scanNode.getTableName()),
+            simpleIndexName, createIndexNode.getIndexPath(),
+            createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
+            createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
+        if (catalog.createIndex(indexDesc)) {
+          LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
+        } else {
+          LOG.info("Index creation " + qualifiedIndexName + " is failed.");
+          throw new TajoInternalError("Cannot create index \"" + qualifiedIndexName + "\".");
+        }
+      }
+    }
+
     private static class MaterializedResultHook implements QueryHook {
 
       @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 611560d..ac1bab5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -313,11 +313,10 @@ public class QueryMasterTask extends CompositeService {
         LOG.warn("Query already started");
         return;
       }
-
-
+      LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
       CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
       LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
-      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
+      LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
       jsonExpr = null; // remove the possible OOM
 
@@ -346,6 +345,14 @@ public class QueryMasterTask extends CompositeService {
             tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
           }
         }
+
+        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN);
+        if (scanNodes != null) {
+          for (LogicalNode eachScanNode : scanNodes) {
+            ScanNode scanNode = (ScanNode) eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+          }
+        }
       }
       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
       queryMasterContext.getGlobalPlanner().build(queryContext, masterPlan);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
deleted file mode 100644
index 23d245b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util;
-
-import com.google.gson.Gson;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.logical.IndexScanNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-public class IndexUtil {
-  public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
-    StringBuilder builder = new StringBuilder(); 
-    builder.append(fragment.getPath().getName() + "_");
-    builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_");
-    for(int i = 0 ; i < keys.length ; i ++) {
-      builder.append(keys[i].getSortKey().getSimpleName()+"_");
-    }
-    builder.append("_index");
-    return builder.toString();
-       
-  }
-  
-  public static String getIndexName(String indexName , SortSpec[] keys) {
-    StringBuilder builder = new StringBuilder();
-    builder.append(indexName + "_");
-    for(int i = 0 ; i < keys.length ; i ++) {
-      builder.append(keys[i].getSortKey().getSimpleName() + "_");
-    }
-    return builder.toString();
-  }
-  
-  public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
-      Iterator<Entry<String, String>> iter ) {
-   
-    EvalNode qual = scanNode.getQual();
-    Gson gson = CoreGsonHelper.getInstance();
-    
-    FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
-    qual.preOrder(nodeFinder);
-    LinkedList<BinaryEval> nodeList = nodeFinder.getNodeList();
-    
-    int maxSize = Integer.MIN_VALUE;
-    SortSpec[] maxIndex = null;
-    
-    String json;
-    while(iter.hasNext()) {
-      Entry<String , String> entry = iter.next();
-      json = entry.getValue();
-      SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
-      if(sortKey.length > nodeList.size()) {
-        /* If the number of the sort key is greater than where condition, 
-         * this index cannot be used
-         * */
-        continue; 
-      } else {
-        boolean[] equal = new boolean[sortKey.length];
-        for(int i = 0 ; i < sortKey.length ; i ++) {
-          for(int j = 0 ; j < nodeList.size() ; j ++) {
-            Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
-            if(col.equals(sortKey[i].getSortKey())) {
-              equal[i] = true;
-            }
-          }
-        }
-        boolean chk = true;
-        for(int i = 0 ; i < equal.length ; i ++) {
-          chk = chk && equal[i];
-        }
-        if(chk) {
-          if(maxSize < sortKey.length) {
-            maxSize = sortKey.length;
-            maxIndex = sortKey;
-          }
-        }
-      }
-    }
-    if(maxIndex == null) {
-      return null;
-    } else {
-      Schema keySchema = new Schema();
-      for(int i = 0 ; i < maxIndex.length ; i ++ ) {
-        keySchema.addColumn(maxIndex[i].getSortKey());
-      }
-      Datum[] datum = new Datum[nodeList.size()];
-      for(int i = 0 ; i < nodeList.size() ; i ++ ) {
-        datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
-      }
-      
-      return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
-    }
-
-  }
-  
-  
-  private static class FieldAndValueFinder implements EvalNodeVisitor {
-    private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>();
-    
-    public LinkedList<BinaryEval> getNodeList () {
-      return this.nodeList;
-    }
-    
-    @Override
-    public void visit(EvalNode node) {
-      BinaryEval binaryEval = (BinaryEval) node;
-      switch(node.getType()) {
-      case AND:
-        break;
-      case EQUAL:
-        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
-          && binaryEval.getRightExpr().getType() == EvalType.CONST ) {
-          nodeList.add(binaryEval);
-        }
-        break;
-      case IS_NULL:
-        if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
-          && binaryEval.getRightExpr().getType() == EvalType.CONST) {
-          nodeList.add(binaryEval);
-        }
-        break;
-      default:
-        break;
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index edb5703..281e23e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -26,8 +26,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.ReturnStateUtil;
 import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
 import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
 import org.apache.tajo.ipc.QueryMasterClientProtocol;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index d020639..5d7a53a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -367,7 +367,18 @@ public class TaskAttemptContext {
     }
     return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
   }
-  
+
+  public String getUniqueKeyFromFragments() {
+    StringBuilder sb = new StringBuilder();
+    for (List<FragmentProto> fragments : fragmentMap.values()) {
+      for (FragmentProto f : fragments) {
+        FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f);
+        sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
+      }
+    }
+    return sb.toString();
+  }
+
   public int hashCode() {
     return Objects.hashCode(taskId);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 82ea479..92c682c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -125,21 +125,9 @@ public class TaskImpl implements Task {
 
   public void initPlan() throws IOException {
     plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
-    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
-    if (scanNode != null) {
-      for (LogicalNode node : scanNode) {
-        ScanNode scan = (ScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
-
-    LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
-    if (partitionScanNode != null) {
-      for (LogicalNode node : partitionScanNode) {
-        PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
+    updateDescsForScanNodes(NodeType.SCAN);
+    updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
+    updateDescsForScanNodes(NodeType.INDEX_SCAN);
 
     interQuery = request.getProto().getInterQuery();
     if (interQuery) {
@@ -179,6 +167,17 @@ public class TaskImpl implements Task {
     LOG.info("==================================");
   }
 
+  private void updateDescsForScanNodes(NodeType nodeType) {
+    assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
+    LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
+    if (scanNodes != null) {
+      for (LogicalNode node : scanNodes) {
+        ScanNode scanNode = (ScanNode) node;
+        descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+      }
+    }
+  }
+
   private void startScriptExecutors() throws IOException {
     for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
       executor.start(systemConf);

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
index f91288d..eb67167 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
@@ -18,13 +18,11 @@
 
 package org.apache.tajo.ws.rs.resources;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.exception.ReturnStateUtil;
-import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
 import org.apache.tajo.master.QueryInProgress;
 import org.apache.tajo.master.QueryInfo;
@@ -263,7 +261,7 @@ public class QueryResource {
         return ResourcesUtil.createBadRequestResponse(LOG, "Provided session id (" + sessionId + ") is invalid.");
       }
       
-      SubmitQueryResponse response = 
+      SubmitQueryResponse response =
         masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false);
       if (ReturnStateUtil.isError(response.getState())) {
         return ResourcesUtil.createExceptionResponse(LOG, response.getState().getMessage());

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 3fcb15e..b5e464b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -901,6 +901,9 @@ public class QueryTestCaseBase {
         if (isLocalTable) {
           createdTableGlobalSet.remove(tableName);
         }
+      } else if (expr.getType() == OpType.CreateIndex) {
+        // TODO: index existence check
+        client.executeQuery(compiled);
       } else {
         assertTrue(ddlFilePath + " is not a Create or Drop Table statement", false);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
index b371be2..aa8070e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
@@ -18,13 +18,22 @@
 
 package org.apache.tajo.cli.tools;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.util.FileUtil;
 import org.junit.Test;
 
 import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.PrintWriter;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestTajoDump extends QueryTestCaseBase {
 
   @Test
@@ -33,16 +42,18 @@ public class TestTajoDump extends QueryTestCaseBase {
       executeString("CREATE TABLE \"" + getCurrentDatabase() +
           "\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)");
 
-      UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      PrintWriter printWriter = new PrintWriter(bos);
-      TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
-      printWriter.flush();
-      printWriter.close();
-      assertStrings(new String(bos.toByteArray()));
-      bos.close();
-
-      executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\"");
+      try {
+        UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        PrintWriter printWriter = new PrintWriter(bos);
+        TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
+        printWriter.flush();
+        printWriter.close();
+        assertStrings(new String(bos.toByteArray()));
+        bos.close();
+      } finally {
+        executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\"");
+      }
     }
   }
 
@@ -52,16 +63,62 @@ public class TestTajoDump extends QueryTestCaseBase {
       executeString("CREATE TABLE \"" + getCurrentDatabase() +
           "\".\"TableName2\" (\"Age\" int, \"Name\" Record (\"FirstName\" TEXT, lastname TEXT))");
 
-      UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      PrintWriter printWriter = new PrintWriter(bos);
-      TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
-      printWriter.flush();
-      printWriter.close();
-      assertStrings(new String(bos.toByteArray()));
-      bos.close();
+      try {
+        UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        PrintWriter printWriter = new PrintWriter(bos);
+        TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
+        printWriter.flush();
+        printWriter.close();
+        assertStrings(new String(bos.toByteArray()));
+        bos.close();
+      } finally {
+        executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName2\"");
+      }
+    }
+  }
+
+  @Test
+  public void testDump3() throws Exception {
+    if (!testingCluster.isHiveCatalogStoreRunning()) {
+      executeString("CREATE TABLE \"" + getCurrentDatabase() +
+          "\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)");
+
+      executeString("CREATE INDEX test_idx on \"" + getCurrentDatabase()
+          + "\".\"TableName1\" ( \"Age\" asc null first, \"FirstName\" desc null last )");
+
+      try {
+        UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        PrintWriter printWriter = new PrintWriter(bos);
+        TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
+        printWriter.flush();
+        printWriter.close();
+        assertOutputResult("testDump3.result", new String(bos.toByteArray()), new String[]{"${index.path}"},
+            new String[]{TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "test_idx").toString()});
+        bos.close();
+      } finally {
+        executeString("DROP INDEX test_idx");
+        executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\"");
+      }
+    }
+  }
+
+  private void assertOutputResult(String expectedResultFile, String actual, String[] paramKeys, String[] paramValues)
+      throws Exception {
+    FileSystem fs = currentResultPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+    Path resultFile = StorageUtil.concatPath(currentResultPath, expectedResultFile);
+    assertTrue(resultFile.toString() + " existence check", fs.exists(resultFile));
+
+    String expectedResult = FileUtil.readTextFile(new File(resultFile.toUri()));
 
-      executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName2\"");
+    if (paramKeys != null) {
+      for (int i = 0; i < paramKeys.length; i++) {
+        if (i < paramValues.length) {
+          expectedResult = expectedResult.replace(paramKeys[i], paramValues[i]);
+        }
+      }
     }
+    assertEquals(expectedResult.trim(), actual.trim());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 7629711..abd0973 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -35,8 +35,8 @@ import org.apache.tajo.engine.codegen.TajoClassLoader;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.function.FunctionSignature;
 import org.apache.tajo.master.exec.QueryExecutor;
 import org.apache.tajo.plan.*;
@@ -44,9 +44,6 @@ import org.apache.tajo.plan.expr.EvalContext;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.serder.EvalNodeDeserializer;
 import org.apache.tajo.plan.serder.EvalNodeSerializer;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.VerificationState;
@@ -60,6 +57,7 @@ import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.datetime.DateTimeUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.apache.tajo.plan.serder.PlanProto;
 
 import java.io.IOException;
 import java.util.List;
@@ -68,9 +66,7 @@ import java.util.TimeZone;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 public class ExprTestBase {
   private static TajoTestingCluster util;
@@ -106,7 +102,7 @@ public class ExprTestBase {
     analyzer = new SQLAnalyzer();
     preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
     planner = new LogicalPlanner(cat, TablespaceManager.getInstance());
-    optimizer = new LogicalOptimizer(util.getConfiguration());
+    optimizer = new LogicalOptimizer(util.getConfiguration(), cat);
     annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
index eb4a3f4..368e89f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
@@ -126,7 +126,7 @@ public class TestJoinOrderAlgorithm {
 
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
-    optimizer = new LogicalOptimizer(util.getConfiguration());
+    optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 10e7e37..640d88b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -104,9 +104,10 @@ public class TestLogicalOptimizer {
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
-    optimizer = new LogicalOptimizer(util.getConfiguration());
+    optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
+    optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 097e232..02e921a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -40,7 +40,6 @@ import org.apache.tajo.engine.function.builtin.SumInt;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.plan.*;
 import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
@@ -151,14 +150,14 @@ public class TestLogicalPlanner {
       "select name, empid, e.deptname, manager from employee as e, dept as dp", // 1
       "select name, empid, e.deptname, manager, score from employee as e, dept, score", // 2
       "select p.deptname, sumtest(score) from dept as p, score group by p.deptName having sumtest(score) > 30", // 3
-      "select p.deptname, score from dept as p, score order by score asc", // 4
+      "select p.deptname, score*200 from dept as p, score order by score*10 asc", // 4
       "select name from employee where empId = 100", // 5
       "select name, score from employee, score", // 6
       "select p.deptName, sumtest(score) from dept as p, score group by p.deptName", // 7
       "create table store1 as select p.deptName, sumtest(score) from dept as p, score group by p.deptName", // 8
       "select deptName, sumtest(score) from score group by deptName having sumtest(score) > 30", // 9
       "select 7 + 8 as res1, 8 * 9 as res2, 10 * 10 as res3", // 10
-      "create index idx_employee on employee using bitmap (name null first, empId desc) with ('fillfactor' = 70)", // 11
+      "create index idx_employee on employee using bitmap_idx (name null first, empId desc) where empid > 100", // 11
       "select name, score from employee, score order by score limit 3", // 12
       "select length(name), length(deptname), *, empid+10 from employee where empId > 500", // 13
   };
@@ -512,7 +511,7 @@ public class TestLogicalPlanner {
     Schema expected = tpch.getOutSchema("q2");
     assertSchema(expected, node.getOutSchema());
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.JOIN);
@@ -551,7 +550,7 @@ public class TestLogicalPlanner {
     LogicalNode node = plan.getRootBlock().getRoot();
     testJsonSerDerObject(node);
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -592,7 +591,7 @@ public class TestLogicalPlanner {
     LogicalNode node = plan.getRootBlock().getRoot();
     testJsonSerDerObject(node);
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -639,7 +638,7 @@ public class TestLogicalPlanner {
     LogicalNode node = plan.getRootBlock().getRoot();
     testJsonSerDerObject(node);
 
-    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+    LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
     optimizer.optimize(plan);
 
     Map<BinaryEval, Boolean> scanMap = TUtil.newHashMap();
@@ -896,6 +895,29 @@ public class TestLogicalPlanner {
   }
 
   @Test
+  public final void testCreateIndexNode() throws TajoException {
+    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    Expr expr = sqlAnalyzer.parse(QUERIES[11]);
+    LogicalPlan rootNode = planner.createPlan(qc, expr);
+    LogicalNode plan = rootNode.getRootBlock().getRoot();
+    testJsonSerDerObject(plan);
+
+    LogicalRootNode root = (LogicalRootNode) plan;
+    assertEquals(NodeType.CREATE_INDEX, root.getChild().getType());
+    CreateIndexNode createIndexNode = root.getChild();
+
+    assertEquals(NodeType.PROJECTION, createIndexNode.getChild().getType());
+    ProjectionNode projNode = createIndexNode.getChild();
+
+    assertEquals(NodeType.SELECTION, projNode.getChild().getType());
+    SelectionNode selNode = projNode.getChild();
+
+    assertEquals(NodeType.SCAN, selNode.getChild().getType());
+    ScanNode scanNode = selNode.getChild();
+    assertEquals(CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), scanNode.getTableName());
+  }
+
+  @Test
   public final void testAsterisk() throws CloneNotSupportedException, TajoException {
     QueryContext qc = createQueryContext();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
deleted file mode 100644
index 6fb7a45..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.Stack;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-
-public class TestBSTIndexExec {
-
-  private TajoConf conf;
-  private Path idxPath;
-  private CatalogService catalog;
-  private SQLAnalyzer analyzer;
-  private LogicalPlanner planner;
-  private LogicalOptimizer optimizer;
-  private FileTablespace sm;
-  private Schema idxSchema;
-  private BaseTupleComparator comp;
-  private BSTIndex.BSTIndexWriter writer;
-  private HashMap<Integer , Integer> randomValues ;
-  private int rndKey = -1;
-  private FileSystem fs;
-  private TableMeta meta;
-  private Path tablePath;
-
-  private Random rnd = new Random(System.currentTimeMillis());
-
-  private TajoTestingCluster util;
-
-  @Before
-  public void setup() throws Exception {
-    this.randomValues = new HashMap<Integer, Integer>();
-    this.conf = new TajoConf();
-    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
-    util = new TajoTestingCluster();
-    util.startCatalogCluster();
-    catalog = util.getMiniCatalogCluster().getCatalog();
-
-    Path workDir = CommonTestingUtil.getTestDir();
-    catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
-    catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = TablespaceManager.getLocalFs();
-
-    idxPath = new Path(workDir, "test.idx");
-
-    Schema schema = new Schema();
-    schema.addColumn("managerid", Type.INT4);
-    schema.addColumn("empid", Type.INT4);
-    schema.addColumn("deptname", Type.TEXT);
-
-    this.idxSchema = new Schema();
-    idxSchema.addColumn("managerid", Type.INT4);
-    SortSpec[] sortKeys = new SortSpec[1];
-    sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
-    this.comp = new BaseTupleComparator(idxSchema, sortKeys);
-
-    this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
-        BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);
-    writer.setLoadNum(100);
-    writer.open();
-    long offset;
-
-    meta = CatalogUtil.newTableMeta("TEXT");
-    tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv");
-    fs = tablePath.getFileSystem(conf);
-    fs.mkdirs(tablePath.getParent());
-
-    FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath);
-    appender.init();
-    VTuple tuple = new VTuple(schema.size());
-    for (int i = 0; i < 10000; i++) {
-      
-      VTuple key = new VTuple(this.idxSchema.size());
-      int rndKey = rnd.nextInt(250);
-      if(this.randomValues.containsKey(rndKey)) {
-        int t = this.randomValues.remove(rndKey) + 1;
-        this.randomValues.put(rndKey, t);
-      } else {
-        this.randomValues.put(rndKey, 1);
-      }
-      
-      key.put(new Datum[] { DatumFactory.createInt4(rndKey) });
-      tuple.put(new Datum[] { DatumFactory.createInt4(rndKey),
-          DatumFactory.createInt4(rnd.nextInt(10)),
-          DatumFactory.createText("dept_" + rnd.nextInt(10)) });
-      offset = appender.getOffset();
-      appender.addTuple(tuple);
-      writer.write(key, offset);
-    }
-    appender.flush();
-    appender.close();
-    writer.close();
-
-    TableDesc desc = new TableDesc(
-      CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta,
-      tablePath.toUri());
-    catalog.createTable(desc);
-
-    analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
-    optimizer = new LogicalOptimizer(conf);
-  }
-
-  @After
-  public void tearDown() {
-    util.shutdownCatalogCluster();
-  }
-
-  @Test
-  public void testEqual() throws Exception {
-    this.rndKey = rnd.nextInt(250);
-    final String QUERY = "select * from employee where managerId = " + rndKey;
-    
-    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
-    Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testEqual");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
-        LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
-    Expr expr = analyzer.parse(QUERY);
-    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
-    LogicalNode rootNode = optimizer.optimize(plan);
-
-    TmpPlanner phyPlanner = new TmpPlanner(conf);
-    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
-    int tupleCount = this.randomValues.get(rndKey);
-    int counter = 0;
-    exec.init();
-    while (exec.next() != null) {
-      counter ++;
-    }
-    exec.close();
-    assertEquals(tupleCount , counter);
-  }
-
-  private class TmpPlanner extends PhysicalPlannerImpl {
-    public TmpPlanner(TajoConf conf) {
-      super(conf);
-    }
-
-    @Override
-    public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
-        throws IOException {
-      Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
-          "Error: There is no table matched to %s", scanNode.getTableName());
-
-      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName()));
-      
-      Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
-
-      return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
-
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 292c414..b9ba2de 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -62,6 +62,7 @@ public class TestHashAntiJoinExec {
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
   private Path testDir;
+  private QueryContext queryContext;
 
   private TableDesc employee;
   private TableDesc people;
@@ -126,11 +127,12 @@ public class TestHashAntiJoinExec {
     appender.flush();
     appender.close();
 
+    queryContext = new QueryContext(conf);
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
-    optimizer = new LogicalOptimizer(conf);
+    optimizer = new LogicalOptimizer(conf, catalog);
   }
 
   @After
@@ -157,7 +159,7 @@ public class TestHashAntiJoinExec {
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testHashAntiJoin");
-    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
         LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
     ctx.setEnforcer(new Enforcer());
     Expr expr = analyzer.parse(QUERIES[0]);


Mime
View raw message