http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index d9712c9..67f782a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -1362,6 +1362,15 @@ public class GlobalPlanner {
}
@Override
+ public LogicalNode visitIndexScan(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
+ IndexScanNode node, Stack<LogicalNode> stack) throws TajoException {
+ ExecutionBlock newBlock = context.plan.newExecutionBlock();
+ newBlock.setPlan(node);
+ context.execBlockMap.put(node.getPID(), newBlock);
+ return node;
+ }
+
+ @Override
public LogicalNode visitPartitionedTableScan(GlobalPlanContext context, LogicalPlan plan,
LogicalPlan.QueryBlock block, PartitionedTableScanNode node,
Stack<LogicalNode> stack)throws TajoException {
@@ -1407,5 +1416,19 @@ public class GlobalPlanner {
return node;
}
+
+ @Override
+ public LogicalNode visitCreateIndex(GlobalPlanContext context, LogicalPlan plan, LogicalPlan.QueryBlock queryBlock,
+ CreateIndexNode node, Stack<LogicalNode> stack) throws TajoException {
+ LogicalNode child = super.visitCreateIndex(context, plan, queryBlock, node, stack);
+
+ // Don't separate execution block. CreateIndex is pushed to the first execution block.
+ ExecutionBlock childBlock = context.execBlockMap.remove(child.getPID());
+ node.setChild(childBlock.getPlan());
+ childBlock.setPlan(node);
+ context.execBlockMap.put(node.getPID(), childBlock);
+
+ return node;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 28622d7..a59960f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -18,23 +18,32 @@
package org.apache.tajo.engine.planner.physical;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.expr.EvalTreeUtil;
+import org.apache.tajo.plan.logical.IndexScanNode;
+import org.apache.tajo.plan.rewrite.rules.IndexScanInfo.SimplePredicate;
import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
public class BSTIndexScanExec extends PhysicalExec {
- private ScanNode scanNode;
+ private IndexScanNode plan;
private SeekableScanner fileScanner;
private EvalNode qual;
@@ -48,31 +57,122 @@ public class BSTIndexScanExec extends PhysicalExec {
private Tuple indexLookupKey;
- public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode ,
- FileFragment fragment, Path fileName , Schema keySchema,
- TupleComparator comparator , Datum[] datum) throws IOException {
- super(context, scanNode.getInSchema(), scanNode.getOutSchema());
- this.scanNode = scanNode;
- this.qual = scanNode.getQual();
- indexLookupKey = new VTuple(datum);
-
- this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(),
- scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
- this.fileScanner.init();
- this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
-
- FileSystem fs = fileName.getFileSystem(context.getConf());
- this.reader = new BSTIndex(fs.getConf()).
- getIndexReader(fileName, keySchema, comparator);
+ private TableStats inputStats;
+
+ private CatalogProtos.FragmentProto fragment;
+
+ private Schema keySchema;
+
+ public BSTIndexScanExec(TaskAttemptContext context, IndexScanNode plan,
+ CatalogProtos.FragmentProto fragment, URI indexPrefix , Schema keySchema,
+ SimplePredicate [] predicates) throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema());
+ this.plan = plan;
+ this.qual = plan.getQual();
+ this.fragment = fragment;
+ this.keySchema = keySchema;
+
+ SortSpec[] keySortSpecs = new SortSpec[predicates.length];
+ Datum[] values = new Datum[predicates.length];
+ for (int i = 0; i < predicates.length; i++) {
+ keySortSpecs[i] = predicates[i].getKeySortSpec();
+ values[i] = predicates[i].getValue();
+ }
+ indexLookupKey = new VTuple(values);
+
+ TupleComparator comparator = new BaseTupleComparator(keySchema,
+ keySortSpecs);
+
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+
+ Path indexPath = new Path(indexPrefix.toString(), context.getUniqueKeyFromFragments());
+ this.reader = new BSTIndex(context.getConf()).
+ getIndexReader(indexPath, keySchema, comparator);
this.reader.open();
}
+ private static Schema mergeSubSchemas(Schema originalSchema, Schema subSchema, Target[] targets, EvalNode qual) {
+ Schema mergedSchema = new Schema();
+ Set<Column> qualAndTargets = TUtil.newHashSet();
+ qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(qual));
+ for (Target target : targets) {
+ qualAndTargets.addAll(EvalTreeUtil.findUniqueColumns(target.getEvalTree()));
+ }
+ for (Column column : originalSchema.getRootColumns()) {
+ if (subSchema.contains(column)
+ || qualAndTargets.contains(column)
+ || qualAndTargets.contains(column)) {
+ mergedSchema.addColumn(column);
+ }
+ }
+ return mergedSchema;
+ }
+
@Override
public void init() throws IOException {
+ Schema projected;
+
+ // in the case where projected column or expression are given
+ // the target can be an empty list.
+ if (plan.hasTargets()) {
+ projected = new Schema();
+ Set<Column> columnSet = new HashSet<Column>();
+
+ if (plan.hasQual()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(qual));
+ }
+
+ for (Target t : plan.getTargets()) {
+ columnSet.addAll(EvalTreeUtil.findUniqueColumns(t.getEvalTree()));
+ }
+
+ for (Column column : inSchema.getAllColumns()) {
+ if (columnSet.contains(column)) {
+ projected.addColumn(column);
+ }
+ }
+
+ } else {
+ // no any projected columns, meaning that all columns should be projected.
+ // TODO - this implicit rule makes code readability bad. So, we should remove it later
+ projected = outSchema;
+ }
+
+ initScanner(projected);
super.init();
progress = 0.0f;
- if (qual != null) {
- qual.bind(context.getEvalContext(), inSchema);
+
+ if (plan.hasQual()) {
+ if (fileScanner.isProjectable()) {
+ qual.bind(context.getEvalContext(), projected);
+ } else {
+ qual.bind(context.getEvalContext(), inSchema);
+ }
+ }
+ }
+
+ private void initScanner(Schema projected) throws IOException {
+
+ // Why we should check nullity? See https://issues.apache.org/jira/browse/TAJO-1422
+ if (fragment != null) {
+
+ Schema fileScanOutSchema = mergeSubSchemas(projected, keySchema, plan.getTargets(), qual);
+
+ this.fileScanner = OldStorageManager.getStorageManager(context.getConf(),
+ plan.getTableDesc().getMeta().getStoreType())
+ .getSeekableScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragment, fileScanOutSchema);
+ this.fileScanner.init();
+
+ // See Scanner.isProjectable() method Depending on the result of isProjectable(),
+ // the width of retrieved tuple is changed.
+ //
+ // If TRUE, the retrieved tuple will contain only projected fields.
+ // If FALSE, the retrieved tuple will contain projected fields and NullDatum for non-projected fields.
+ if (fileScanner.isProjectable()) {
+ this.projector = new Projector(context, projected, outSchema, plan.getTargets());
+ } else {
+ this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
+ }
}
}
@@ -102,8 +202,9 @@ public class BSTIndexScanExec extends PhysicalExec {
fileScanner.seek(offset);
}
}
+
Tuple tuple;
- if (!scanNode.hasQual()) {
+ if (!plan.hasQual()) {
if ((tuple = fileScanner.next()) != null) {
return projector.eval(tuple);
} else {
@@ -115,8 +216,11 @@ public class BSTIndexScanExec extends PhysicalExec {
return projector.eval(tuple);
} else {
long offset = reader.next();
- if (offset == -1) return null;
+ if (offset == -1) {
+ return null;
+ }
else fileScanner.seek(offset);
+ return null;
}
}
}
@@ -131,9 +235,19 @@ public class BSTIndexScanExec extends PhysicalExec {
@Override
public void close() throws IOException {
IOUtils.cleanup(null, reader, fileScanner);
+ if (fileScanner != null) {
+ try {
+ TableStats stats = fileScanner.getInputStats();
+ if (stats != null) {
+ inputStats = (TableStats) stats.clone();
+ }
+ } catch (CloneNotSupportedException e) {
+ e.printStackTrace();
+ }
+ }
reader = null;
fileScanner = null;
- scanNode = null;
+ plan = null;
qual = null;
projector = null;
indexLookupKey = null;
@@ -143,4 +257,13 @@ public class BSTIndexScanExec extends PhysicalExec {
public float getProgress() {
return progress;
}
+
+ @Override
+ public TableStats getInputStats() {
+ if (fileScanner != null) {
+ return fileScanner.getInputStats();
+ } else {
+ return inputStats;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index 8a79005..488c3ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -54,7 +54,9 @@ public class ProjectionExec extends UnaryPhysicalExec {
return null;
}
- return projector.eval(tuple);
+ Tuple outTuple = projector.eval(tuple);
+ outTuple.setOffset(tuple.getOffset());
+ return outTuple;
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
new file mode 100644
index 0000000..f9db842
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreIndexExec.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.logical.CreateIndexNode;
+import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class StoreIndexExec extends UnaryPhysicalExec {
+ private static final Log LOG = LogFactory.getLog(StoreIndexExec.class);
+ private BSTIndexWriter indexWriter;
+ private final CreateIndexNode logicalPlan;
+ private int[] indexKeys = null;
+ private Schema keySchema;
+ private TupleComparator comparator;
+
+ public StoreIndexExec(final TaskAttemptContext context, final CreateIndexNode logicalPlan,
+ final PhysicalExec child) {
+ super(context, logicalPlan.getInSchema(), logicalPlan.getOutSchema(), child);
+ this.logicalPlan = logicalPlan;
+ }
+
+ @Override
+ public void init() throws IOException {
+ super.init();
+
+ SortSpec[] sortSpecs = logicalPlan.getKeySortSpecs();
+ indexKeys = new int[sortSpecs.length];
+ keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs);
+
+ Column col;
+ for (int i = 0 ; i < sortSpecs.length; i++) {
+ col = sortSpecs[i].getSortKey();
+ indexKeys[i] = inSchema.getColumnId(col.getQualifiedName());
+ }
+
+ TajoConf conf = context.getConf();
+ Path indexPath = new Path(logicalPlan.getIndexPath().toString(), context.getUniqueKeyFromFragments());
+ // TODO: Create factory using reflection
+ BSTIndex bst = new BSTIndex(conf);
+ this.comparator = new BaseTupleComparator(keySchema, sortSpecs);
+ this.indexWriter = bst.getIndexWriter(indexPath, BSTIndex.TWO_LEVEL_INDEX, keySchema, comparator);
+ this.indexWriter.setLoadNum(100);
+ this.indexWriter.open();
+ }
+
+ @Override
+ public Tuple next() throws IOException {
+ Tuple tuple;
+ Tuple keyTuple;
+ long offset;
+
+ while((tuple = child.next()) != null) {
+ offset = tuple.getOffset();
+ keyTuple = new VTuple(keySchema.size());
+ RowStoreUtil.project(tuple, keyTuple, indexKeys);
+ indexWriter.write(keyTuple, offset);
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+
+ indexWriter.flush();
+ IOUtils.cleanup(LOG, indexWriter);
+
+ indexWriter = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
index 6582513..54b6c5e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java
@@ -18,11 +18,10 @@
package org.apache.tajo.engine.utils.test;
-import org.apache.tajo.OverridableConf;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRuleContext;
@SuppressWarnings("unused")
public class ErrorInjectionRewriter implements LogicalPlanRewriteRule {
@@ -32,12 +31,12 @@ public class ErrorInjectionRewriter implements LogicalPlanRewriteRule {
}
@Override
- public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
+ public boolean isEligible(LogicalPlanRewriteRuleContext context) {
return true;
}
@Override
- public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws TajoException {
+ public LogicalPlan rewrite(LogicalPlanRewriteRuleContext context) throws TajoException {
throw new NullPointerException();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 9cd20b4..2ad45ba 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -96,7 +96,8 @@ public class GlobalEngine extends AbstractService {
analyzer = new SQLAnalyzer();
preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(context.getConf());
+ // Access path rewriter is enabled only in QueryMasterTask
+ optimizer = new LogicalOptimizer(context.getConf(), context.getCatalog());
annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 78fc0f5..a597d32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -36,8 +36,7 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.FunctionListResponse;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableResponse;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryContext;
@@ -65,7 +64,10 @@ import org.apache.tajo.util.ProtoUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.exception.ExceptionUtil.printStackTraceIfError;
@@ -311,7 +313,6 @@ public class TajoMasterClientService extends AbstractService {
.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto())
.setUserName(context.getConf().getVar(ConfVars.USERNAME))
.build();
-
}
}
@@ -959,5 +960,200 @@ public class TajoMasterClientService extends AbstractService {
.build();
}
}
+
+ @Override
+ public IndexResponse getIndexWithName(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String indexName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ indexName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ indexName = request.getValue();
+ }
+ IndexDescProto indexProto = catalog.getIndexByName(databaseName, indexName).getProto();
+ return IndexResponse.newBuilder()
+ .setState(OK)
+ .setIndexDesc(indexProto)
+ .build();
+
+ } catch (Throwable t) {
+ return IndexResponse.newBuilder()
+ .setState(returnError(t))
+ .build();
+ }
+ }
+
+ @Override
+ public ReturnState existIndexWithName(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String indexName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ indexName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ indexName = request.getValue();
+ }
+
+ if (catalog.existIndexByName(databaseName, indexName)) {
+ return OK;
+ } else {
+ return errUndefinedIndexName(indexName);
+ }
+ } catch (Throwable t) {
+ return returnError(t);
+ }
+ }
+
+ @Override
+ public IndexListResponse getIndexesForTable(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getValue();
+ }
+
+ IndexListResponse.Builder builder = IndexListResponse.newBuilder().setState(OK);
+ for (IndexDesc index : catalog.getAllIndexesByTable(databaseName, tableName)) {
+ builder.addIndexDesc(index.getProto());
+ }
+ return builder.build();
+ } catch (Throwable t) {
+ return IndexListResponse.newBuilder()
+ .setState(returnError(t))
+ .build();
+ }
+ }
+
+ @Override
+ public ReturnState existIndexesForTable(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getValue();
+ }
+ if (catalog.existIndexesByTable(databaseName, tableName)) {
+ return OK;
+ } else {
+ return errUndefinedIndex(tableName);
+ }
+ } catch (Throwable t) {
+ return returnError(t);
+ }
+ }
+
+ @Override
+ public IndexResponse getIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getTableName())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getTableName();
+ }
+ String[] columnNames = new String[request.getColumnNamesCount()];
+ columnNames = request.getColumnNamesList().toArray(columnNames);
+
+ return IndexResponse.newBuilder()
+ .setState(OK)
+ .setIndexDesc(catalog.getIndexByColumnNames(databaseName, tableName, columnNames).getProto())
+ .build();
+
+ } catch (Throwable t) {
+ return IndexResponse.newBuilder()
+ .setState(returnError(t))
+ .build();
+ }
+ }
+
+ @Override
+ public ReturnState existIndexWithColumns(RpcController controller, GetIndexWithColumnsRequest request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+ String tableName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getTableName())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getTableName());
+ databaseName = splitted[0];
+ tableName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ tableName = request.getTableName();
+ }
+ String[] columnNames = new String[request.getColumnNamesCount()];
+ columnNames = request.getColumnNamesList().toArray(columnNames);
+ if (catalog.existIndexByColumnNames(databaseName, tableName, columnNames)) {
+ return OK;
+ } else {
+ return errUndefinedIndex(tableName, request.getColumnNamesList());
+ }
+ } catch (Throwable t) {
+ return returnError(t);
+ }
+ }
+
+ @Override
+ public ReturnState dropIndex(RpcController controller, SessionedStringProto request)
+ throws ServiceException {
+ try {
+ context.getSessionManager().touch(request.getSessionId().getId());
+ Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+ QueryContext queryContext = new QueryContext(conf, session);
+
+ String indexName, databaseName;
+ if (CatalogUtil.isFQTableName(request.getValue())) {
+ String [] splitted = CatalogUtil.splitFQTableName(request.getValue());
+ databaseName = splitted[0];
+ indexName = splitted[1];
+ } else {
+ databaseName = session.getCurrentDatabase();
+ indexName = request.getValue();
+ }
+ catalog.dropIndex(databaseName, indexName);
+
+ return OK;
+ } catch (Throwable t) {
+ return returnError(t);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index f6bb4f7..a535f94 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -72,45 +72,123 @@ public class DDLExecutor {
switch (root.getType()) {
- case ALTER_TABLESPACE:
- AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
- alterTablespace(context, queryContext, alterTablespace);
- return true;
+ case ALTER_TABLESPACE:
+ AlterTablespaceNode alterTablespace = (AlterTablespaceNode) root;
+ alterTablespace(context, queryContext, alterTablespace);
+ return true;
- case CREATE_DATABASE:
- CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
- createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
- return true;
- case DROP_DATABASE:
- DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
- dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
- return true;
+ case CREATE_DATABASE:
+ CreateDatabaseNode createDatabase = (CreateDatabaseNode) root;
+ createDatabase(queryContext, createDatabase.getDatabaseName(), null, createDatabase.isIfNotExists());
+ return true;
+ case DROP_DATABASE:
+ DropDatabaseNode dropDatabaseNode = (DropDatabaseNode) root;
+ dropDatabase(queryContext, dropDatabaseNode.getDatabaseName(), dropDatabaseNode.isIfExists());
+ return true;
- case CREATE_TABLE:
- CreateTableNode createTable = (CreateTableNode) root;
- createTable(queryContext, createTable, createTable.isIfNotExists());
- return true;
- case DROP_TABLE:
- DropTableNode dropTable = (DropTableNode) root;
- dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
- return true;
- case TRUNCATE_TABLE:
- TruncateTableNode truncateTable = (TruncateTableNode) root;
- truncateTable(queryContext, truncateTable);
- return true;
+ case CREATE_TABLE:
+ CreateTableNode createTable = (CreateTableNode) root;
+ createTable(queryContext, createTable, createTable.isIfNotExists());
+ return true;
+ case DROP_TABLE:
+ DropTableNode dropTable = (DropTableNode) root;
+ dropTable(queryContext, dropTable.getTableName(), dropTable.isIfExists(), dropTable.isPurge());
+ return true;
+ case TRUNCATE_TABLE:
+ TruncateTableNode truncateTable = (TruncateTableNode) root;
+ truncateTable(queryContext, truncateTable);
+ return true;
- case ALTER_TABLE:
- AlterTableNode alterTable = (AlterTableNode) root;
- alterTable(context, queryContext, alterTable);
- return true;
+ case ALTER_TABLE:
+ AlterTableNode alterTable = (AlterTableNode) root;
+ alterTable(context, queryContext, alterTable);
+ return true;
+
+ case CREATE_INDEX:
+ CreateIndexNode createIndex = (CreateIndexNode) root;
+ createIndex(queryContext, createIndex);
+ return true;
+
+ case DROP_INDEX:
+ DropIndexNode dropIndexNode = (DropIndexNode) root;
+ dropIndex(queryContext, dropIndexNode);
+ return true;
default:
throw new InternalError("updateQuery cannot handle such query: \n" + root.toJson());
}
}
+ public void createIndex(final QueryContext queryContext, final CreateIndexNode createIndexNode) {
+ String databaseName, simpleIndexName, qualifiedIndexName;
+ if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+ String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ qualifiedIndexName = createIndexNode.getIndexName();
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = createIndexNode.getIndexName();
+ qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+ }
+
+ if (catalog.existIndexByName(databaseName, simpleIndexName)) {
+ throw new DuplicateIndexException(simpleIndexName);
+ }
+
+ ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
+ if (scanNode == null) {
+ throw new InternalError("Cannot find the table of the relation");
+ }
+
+ IndexDesc indexDesc = new IndexDesc(databaseName, CatalogUtil.extractSimpleName(scanNode.getTableName()),
+ simpleIndexName, createIndexNode.getIndexPath(),
+ createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
+ createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
+
+ if (catalog.createIndex(indexDesc)) {
+ LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
+ } else {
+ LOG.info("Index creation " + qualifiedIndexName + " is failed.");
+ throw new TajoInternalError("Cannot create index \"" + qualifiedIndexName + "\".");
+ }
+ }
+
+ public void dropIndex(final QueryContext queryContext, final DropIndexNode dropIndexNode) {
+ String databaseName, simpleIndexName;
+ if (CatalogUtil.isFQTableName(dropIndexNode.getIndexName())) {
+ String [] splits = CatalogUtil.splitFQTableName(dropIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = dropIndexNode.getIndexName();
+ }
+
+ if (!catalog.existIndexByName(databaseName, simpleIndexName)) {
+ throw new UndefinedIndexException(simpleIndexName);
+ }
+
+ IndexDesc desc = catalog.getIndexByName(databaseName, simpleIndexName);
+
+ if (!catalog.dropIndex(databaseName, simpleIndexName)) {
+ LOG.info("Cannot drop index \"" + simpleIndexName + "\".");
+ throw new TajoInternalError("Cannot drop index \"" + simpleIndexName + "\".");
+ }
+
+ Path indexPath = new Path(desc.getIndexPath());
+ try {
+ FileSystem fs = indexPath.getFileSystem(context.getConf());
+ fs.delete(indexPath, true);
+ } catch (IOException e) {
+ throw new InternalError(e.getMessage());
+ }
+
+ LOG.info("Index " + simpleIndexName + " is dropped.");
+ }
+
/**
* Alter a given table
*/
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index 228c0b8..b348265 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -25,7 +25,11 @@ import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
import org.apache.tajo.TaskAttemptId;
import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes.DataType;
@@ -296,43 +300,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
return tuples;
}
-
+
private List<Tuple> getIndexes(Schema outSchema) {
- List<IndexProto> indexList = masterContext.getCatalog().getAllIndexes();
+ List<IndexDescProto> indexList = masterContext.getCatalog().getAllIndexes();
List<Tuple> tuples = new ArrayList<Tuple>(indexList.size());
List<Column> columns = outSchema.getRootColumns();
Tuple aTuple;
- for (IndexProto index: indexList) {
+ for (IndexDescProto index: indexList) {
aTuple = new VTuple(outSchema.size());
-
+
for (int fieldId = 0; fieldId < columns.size(); fieldId++) {
Column column = columns.get(fieldId);
-
+
if ("db_id".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createInt4(index.getDbId()));
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getDbId()));
} else if ("tid".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createInt4(index.getTId()));
+ aTuple.put(fieldId, DatumFactory.createInt4(index.getTableIdentifier().getTid()));
} else if ("index_name".equalsIgnoreCase(column.getSimpleName())) {
aTuple.put(fieldId, DatumFactory.createText(index.getIndexName()));
- } else if ("column_name".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getColumnName()));
- } else if ("data_type".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getDataType()));
- } else if ("index_type".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createText(index.getIndexType()));
- } else if ("is_unique".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsUnique()));
- } else if ("is_clustered".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsClustered()));
- } else if ("is_ascending".equalsIgnoreCase(column.getSimpleName())) {
- aTuple.put(fieldId, DatumFactory.createBool(index.getIsAscending()));
+ } else if ("index_method".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexMethod().name()));
+ } else if ("index_path".equalsIgnoreCase(column.getSimpleName())) {
+ aTuple.put(fieldId, DatumFactory.createText(index.getIndexPath()));
}
}
-
+
tuples.add(aTuple);
}
-
+
return tuples;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 42e5f61..e7fc4d2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -25,10 +25,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.*;
-import org.apache.tajo.catalog.CatalogService;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.DuplicateIndexException;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
@@ -71,8 +69,8 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase;
import static org.apache.tajo.exception.ReturnStateUtil.OK;
+import static org.apache.tajo.exception.ReturnStateUtil.errUndefinedDatabase;
public class QueryExecutor {
private static final Log LOG = LogFactory.getLog(QueryExecutor.class);
@@ -105,10 +103,17 @@ public class QueryExecutor {
} else if (PlannerUtil.checkIfDDLPlan(rootNode)) {
- ddlExecutor.execute(queryContext, plan);
- response.setState(OK);
- response.setResultType(ResultType.NO_RESULT);
+ if (PlannerUtil.isDistExecDDL(rootNode)) {
+ if (rootNode.getChild().getType() == NodeType.CREATE_INDEX) {
+ checkIndexExistence(queryContext, (CreateIndexNode) rootNode.getChild());
+ }
+ executeDistributedQuery(queryContext, session, plan, sql, jsonExpr, response);
+ } else {
+ ddlExecutor.execute(queryContext, plan);
+ response.setState(OK);
+ response.setResultType(ResultType.NO_RESULT);
+ }
} else if (plan.isExplain()) { // explain query
execExplain(session, sql, plan, queryContext, plan.isExplainGlobal(), response);
@@ -135,7 +140,7 @@ public class QueryExecutor {
public void execSetSession(Session session, LogicalPlan plan,
SubmitQueryResponse.Builder response) {
- SetSessionNode setSessionNode = ((LogicalRootNode)plan.getRootBlock().getRoot()).getChild();
+ SetSessionNode setSessionNode = ((LogicalRootNode) plan.getRootBlock().getRoot()).getChild();
final String varName = setSessionNode.getName();
@@ -523,6 +528,25 @@ public class QueryExecutor {
" is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
}
+ private void checkIndexExistence(final QueryContext queryContext, final CreateIndexNode createIndexNode)
+ throws IOException {
+ String databaseName, simpleIndexName, qualifiedIndexName;
+ if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+ String[] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ qualifiedIndexName = createIndexNode.getIndexName();
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = createIndexNode.getIndexName();
+ qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+ }
+
+ if (catalog.existIndexByName(databaseName, simpleIndexName)) {
+ throw new DuplicateIndexException(qualifiedIndexName);
+ }
+ }
+
public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
throws Exception {
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 6fc4ea9..e3629c7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -33,6 +33,8 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryVars;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.CatalogException;
import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
@@ -43,6 +45,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
import org.apache.tajo.engine.planner.global.ExecutionQueue;
import org.apache.tajo.engine.planner.global.MasterPlan;
+import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
@@ -513,6 +516,7 @@ public class Query implements EventHandler<QueryEvent> {
hookList.add(new MaterializedResultHook());
hookList.add(new CreateTableHook());
hookList.add(new InsertTableHook());
+ hookList.add(new CreateIndexHook());
}
public void execute(QueryContext queryContext, Query query,
@@ -526,6 +530,48 @@ public class Query implements EventHandler<QueryEvent> {
}
}
+ private static class CreateIndexHook implements QueryHook {
+
+ @Override
+ public boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+ Stage lastStage = query.getStage(finalExecBlockId);
+ return lastStage.getBlock().getPlan().getType() == NodeType.CREATE_INDEX;
+ }
+
+ @Override
+ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) throws Exception {
+ CatalogService catalog = context.getWorkerContext().getCatalog();
+ Stage lastStage = query.getStage(finalExecBlockId);
+
+ CreateIndexNode createIndexNode = (CreateIndexNode) lastStage.getBlock().getPlan();
+ String databaseName, simpleIndexName, qualifiedIndexName;
+ if (CatalogUtil.isFQTableName(createIndexNode.getIndexName())) {
+ String [] splits = CatalogUtil.splitFQTableName(createIndexNode.getIndexName());
+ databaseName = splits[0];
+ simpleIndexName = splits[1];
+ qualifiedIndexName = createIndexNode.getIndexName();
+ } else {
+ databaseName = queryContext.getCurrentDatabase();
+ simpleIndexName = createIndexNode.getIndexName();
+ qualifiedIndexName = CatalogUtil.buildFQName(databaseName, simpleIndexName);
+ }
+ ScanNode scanNode = PlannerUtil.findTopNode(createIndexNode, NodeType.SCAN);
+ if (scanNode == null) {
+ throw new IOException("Cannot find the table of the relation");
+ }
+ IndexDesc indexDesc = new IndexDesc(databaseName, CatalogUtil.extractSimpleName(scanNode.getTableName()),
+ simpleIndexName, createIndexNode.getIndexPath(),
+ createIndexNode.getKeySortSpecs(), createIndexNode.getIndexMethod(),
+ createIndexNode.isUnique(), false, scanNode.getLogicalSchema());
+ if (catalog.createIndex(indexDesc)) {
+ LOG.info("Index " + qualifiedIndexName + " is created for the table " + scanNode.getTableName() + ".");
+ } else {
+ LOG.info("Index creation " + qualifiedIndexName + " is failed.");
+ throw new TajoInternalError("Cannot create index \"" + qualifiedIndexName + "\".");
+ }
+ }
+ }
+
private static class MaterializedResultHook implements QueryHook {
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 611560d..ac1bab5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -313,11 +313,10 @@ public class QueryMasterTask extends CompositeService {
LOG.warn("Query already started");
return;
}
-
-
+ LOG.info(SessionVars.INDEX_ENABLED.keyname() + " : " + queryContext.getBool(SessionVars.INDEX_ENABLED));
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
+ LogicalOptimizer optimizer = new LogicalOptimizer(systemConf, catalog);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
jsonExpr = null; // remove the possible OOM
@@ -346,6 +345,14 @@ public class QueryMasterTask extends CompositeService {
tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
}
}
+
+ scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.INDEX_SCAN);
+ if (scanNodes != null) {
+ for (LogicalNode eachScanNode : scanNodes) {
+ ScanNode scanNode = (ScanNode) eachScanNode;
+ tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+ }
+ }
}
MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
queryMasterContext.getGlobalPlanner().build(queryContext, masterPlan);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
deleted file mode 100644
index 23d245b..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.util;
-
-import com.google.gson.Gson;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.logical.IndexScanNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map.Entry;
-
-public class IndexUtil {
- public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
- StringBuilder builder = new StringBuilder();
- builder.append(fragment.getPath().getName() + "_");
- builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_");
- for(int i = 0 ; i < keys.length ; i ++) {
- builder.append(keys[i].getSortKey().getSimpleName()+"_");
- }
- builder.append("_index");
- return builder.toString();
-
- }
-
- public static String getIndexName(String indexName , SortSpec[] keys) {
- StringBuilder builder = new StringBuilder();
- builder.append(indexName + "_");
- for(int i = 0 ; i < keys.length ; i ++) {
- builder.append(keys[i].getSortKey().getSimpleName() + "_");
- }
- return builder.toString();
- }
-
- public static IndexScanNode indexEval(LogicalPlan plan, ScanNode scanNode,
- Iterator<Entry<String, String>> iter ) {
-
- EvalNode qual = scanNode.getQual();
- Gson gson = CoreGsonHelper.getInstance();
-
- FieldAndValueFinder nodeFinder = new FieldAndValueFinder();
- qual.preOrder(nodeFinder);
- LinkedList<BinaryEval> nodeList = nodeFinder.getNodeList();
-
- int maxSize = Integer.MIN_VALUE;
- SortSpec[] maxIndex = null;
-
- String json;
- while(iter.hasNext()) {
- Entry<String , String> entry = iter.next();
- json = entry.getValue();
- SortSpec[] sortKey = gson.fromJson(json, SortSpec[].class);
- if(sortKey.length > nodeList.size()) {
- /* If the number of the sort key is greater than where condition,
- * this index cannot be used
- * */
- continue;
- } else {
- boolean[] equal = new boolean[sortKey.length];
- for(int i = 0 ; i < sortKey.length ; i ++) {
- for(int j = 0 ; j < nodeList.size() ; j ++) {
- Column col = ((FieldEval)(nodeList.get(j).getLeftExpr())).getColumnRef();
- if(col.equals(sortKey[i].getSortKey())) {
- equal[i] = true;
- }
- }
- }
- boolean chk = true;
- for(int i = 0 ; i < equal.length ; i ++) {
- chk = chk && equal[i];
- }
- if(chk) {
- if(maxSize < sortKey.length) {
- maxSize = sortKey.length;
- maxIndex = sortKey;
- }
- }
- }
- }
- if(maxIndex == null) {
- return null;
- } else {
- Schema keySchema = new Schema();
- for(int i = 0 ; i < maxIndex.length ; i ++ ) {
- keySchema.addColumn(maxIndex[i].getSortKey());
- }
- Datum[] datum = new Datum[nodeList.size()];
- for(int i = 0 ; i < nodeList.size() ; i ++ ) {
- datum[i] = ((ConstEval)(nodeList.get(i).getRightExpr())).getValue();
- }
-
- return new IndexScanNode(plan.newPID(), scanNode, keySchema , datum , maxIndex);
- }
-
- }
-
-
- private static class FieldAndValueFinder implements EvalNodeVisitor {
- private LinkedList<BinaryEval> nodeList = new LinkedList<BinaryEval>();
-
- public LinkedList<BinaryEval> getNodeList () {
- return this.nodeList;
- }
-
- @Override
- public void visit(EvalNode node) {
- BinaryEval binaryEval = (BinaryEval) node;
- switch(node.getType()) {
- case AND:
- break;
- case EQUAL:
- if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
- && binaryEval.getRightExpr().getType() == EvalType.CONST ) {
- nodeList.add(binaryEval);
- }
- break;
- case IS_NULL:
- if( binaryEval.getLeftExpr().getType() == EvalType.FIELD
- && binaryEval.getRightExpr().getType() == EvalType.CONST) {
- nodeList.add(binaryEval);
- }
- break;
- default:
- break;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index edb5703..281e23e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -26,8 +26,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.tajo.QueryId;
-import org.apache.tajo.exception.ReturnStateUtil;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.ReturnStateUtil;
import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse;
import org.apache.tajo.ipc.ClientProtos.QueryIdRequest;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index d020639..5d7a53a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -367,7 +367,18 @@ public class TaskAttemptContext {
}
return fragmentMap.get(id).toArray(new FragmentProto[fragmentMap.get(id).size()]);
}
-
+
+ public String getUniqueKeyFromFragments() {
+ StringBuilder sb = new StringBuilder();
+ for (List<FragmentProto> fragments : fragmentMap.values()) {
+ for (FragmentProto f : fragments) {
+ FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, f);
+ sb.append(fileFragment.getPath().getName()).append(fileFragment.getStartKey()).append(fileFragment.getLength());
+ }
+ }
+ return sb.toString();
+ }
+
public int hashCode() {
return Objects.hashCode(taskId);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 82ea479..92c682c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -125,21 +125,9 @@ public class TaskImpl implements Task {
public void initPlan() throws IOException {
plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
- LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
- if (scanNode != null) {
- for (LogicalNode node : scanNode) {
- ScanNode scan = (ScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
-
- LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
- if (partitionScanNode != null) {
- for (LogicalNode node : partitionScanNode) {
- PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
- descs.put(scan.getCanonicalName(), scan.getTableDesc());
- }
- }
+ updateDescsForScanNodes(NodeType.SCAN);
+ updateDescsForScanNodes(NodeType.PARTITIONS_SCAN);
+ updateDescsForScanNodes(NodeType.INDEX_SCAN);
interQuery = request.getProto().getInterQuery();
if (interQuery) {
@@ -179,6 +167,17 @@ public class TaskImpl implements Task {
LOG.info("==================================");
}
+ private void updateDescsForScanNodes(NodeType nodeType) {
+ assert nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.INDEX_SCAN;
+ LogicalNode[] scanNodes = PlannerUtil.findAllNodes(plan, nodeType);
+ if (scanNodes != null) {
+ for (LogicalNode node : scanNodes) {
+ ScanNode scanNode = (ScanNode) node;
+ descs.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+ }
+ }
+ }
+
private void startScriptExecutors() throws IOException {
for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
executor.start(systemConf);
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
index f91288d..eb67167 100644
--- a/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/ws/rs/resources/QueryResource.java
@@ -18,13 +18,11 @@
package org.apache.tajo.ws.rs.resources;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.exception.ReturnStateUtil;
-import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
import org.apache.tajo.master.QueryInProgress;
import org.apache.tajo.master.QueryInfo;
@@ -263,7 +261,7 @@ public class QueryResource {
return ResourcesUtil.createBadRequestResponse(LOG, "Provided session id (" + sessionId + ") is invalid.");
}
- SubmitQueryResponse response =
+ SubmitQueryResponse response =
masterContext.getGlobalEngine().executeQuery(session, request.getQuery(), false);
if (ReturnStateUtil.isError(response.getState())) {
return ResourcesUtil.createExceptionResponse(LOG, response.getState().getMessage());
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 3fcb15e..b5e464b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -901,6 +901,9 @@ public class QueryTestCaseBase {
if (isLocalTable) {
createdTableGlobalSet.remove(tableName);
}
+ } else if (expr.getType() == OpType.CreateIndex) {
+ // TODO: index existence check
+ client.executeQuery(compiled);
} else {
assertTrue(ddlFilePath + " is not a Create or Drop Table statement", false);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
index b371be2..aa8070e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java
@@ -18,13 +18,22 @@
package org.apache.tajo.cli.tools;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.util.FileUtil;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
+import java.io.File;
import java.io.PrintWriter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class TestTajoDump extends QueryTestCaseBase {
@Test
@@ -33,16 +42,18 @@ public class TestTajoDump extends QueryTestCaseBase {
executeString("CREATE TABLE \"" + getCurrentDatabase() +
"\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)");
- UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- PrintWriter printWriter = new PrintWriter(bos);
- TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
- printWriter.flush();
- printWriter.close();
- assertStrings(new String(bos.toByteArray()));
- bos.close();
-
- executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\"");
+ try {
+ UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintWriter printWriter = new PrintWriter(bos);
+ TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
+ printWriter.flush();
+ printWriter.close();
+ assertStrings(new String(bos.toByteArray()));
+ bos.close();
+ } finally {
+ executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\"");
+ }
}
}
@@ -52,16 +63,62 @@ public class TestTajoDump extends QueryTestCaseBase {
executeString("CREATE TABLE \"" + getCurrentDatabase() +
"\".\"TableName2\" (\"Age\" int, \"Name\" Record (\"FirstName\" TEXT, lastname TEXT))");
- UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- PrintWriter printWriter = new PrintWriter(bos);
- TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
- printWriter.flush();
- printWriter.close();
- assertStrings(new String(bos.toByteArray()));
- bos.close();
+ try {
+ UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintWriter printWriter = new PrintWriter(bos);
+ TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
+ printWriter.flush();
+ printWriter.close();
+ assertStrings(new String(bos.toByteArray()));
+ bos.close();
+ } finally {
+ executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName2\"");
+ }
+ }
+ }
+
+ @Test
+ public void testDump3() throws Exception {
+ if (!testingCluster.isHiveCatalogStoreRunning()) {
+ executeString("CREATE TABLE \"" + getCurrentDatabase() +
+ "\".\"TableName1\" (\"Age\" int, \"FirstName\" TEXT, lastname TEXT)");
+
+ executeString("CREATE INDEX test_idx on \"" + getCurrentDatabase()
+ + "\".\"TableName1\" ( \"Age\" asc null first, \"FirstName\" desc null last )");
+
+ try {
+ UserRoleInfo userInfo = UserRoleInfo.getCurrentUser();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ PrintWriter printWriter = new PrintWriter(bos);
+ TajoDump.dump(client, userInfo, getCurrentDatabase(), false, false, false, printWriter);
+ printWriter.flush();
+ printWriter.close();
+ assertOutputResult("testDump3.result", new String(bos.toByteArray()), new String[]{"${index.path}"},
+ new String[]{TablespaceManager.getDefault().getTableUri(getCurrentDatabase(), "test_idx").toString()});
+ bos.close();
+ } finally {
+ executeString("DROP INDEX test_idx");
+ executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName1\"");
+ }
+ }
+ }
+
+ private void assertOutputResult(String expectedResultFile, String actual, String[] paramKeys, String[] paramValues)
+ throws Exception {
+ FileSystem fs = currentResultPath.getFileSystem(testBase.getTestingCluster().getConfiguration());
+ Path resultFile = StorageUtil.concatPath(currentResultPath, expectedResultFile);
+ assertTrue(resultFile.toString() + " existence check", fs.exists(resultFile));
+
+ String expectedResult = FileUtil.readTextFile(new File(resultFile.toUri()));
- executeString("DROP TABLE \"" + getCurrentDatabase() + "\".\"TableName2\"");
+ if (paramKeys != null) {
+ for (int i = 0; i < paramKeys.length; i++) {
+ if (i < paramValues.length) {
+ expectedResult = expectedResult.replace(paramKeys[i], paramValues[i]);
+ }
+ }
}
+ assertEquals(expectedResult.trim(), actual.trim());
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 7629711..abd0973 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -35,8 +35,8 @@ import org.apache.tajo.engine.codegen.TajoClassLoader;
import org.apache.tajo.engine.function.FunctionLoader;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.function.FunctionSignature;
import org.apache.tajo.master.exec.QueryExecutor;
import org.apache.tajo.plan.*;
@@ -44,9 +44,6 @@ import org.apache.tajo.plan.expr.EvalContext;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.serder.EvalNodeDeserializer;
import org.apache.tajo.plan.serder.EvalNodeSerializer;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.plan.serder.PlanProto;
import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
import org.apache.tajo.plan.verifier.VerificationState;
@@ -60,6 +57,7 @@ import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.datetime.DateTimeUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.apache.tajo.plan.serder.PlanProto;
import java.io.IOException;
import java.util.List;
@@ -68,9 +66,7 @@ import java.util.TimeZone;
import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
public class ExprTestBase {
private static TajoTestingCluster util;
@@ -106,7 +102,7 @@ public class ExprTestBase {
analyzer = new SQLAnalyzer();
preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
planner = new LogicalPlanner(cat, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), cat);
annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
index eb4a3f4..368e89f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestJoinOrderAlgorithm.java
@@ -126,7 +126,7 @@ public class TestJoinOrderAlgorithm {
sqlAnalyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 10e7e37..640d88b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -104,9 +104,10 @@ public class TestLogicalOptimizer {
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
+ optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 097e232..02e921a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -40,7 +40,6 @@ import org.apache.tajo.engine.function.builtin.SumInt;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.plan.*;
import org.apache.tajo.exception.TajoException;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
@@ -151,14 +150,14 @@ public class TestLogicalPlanner {
"select name, empid, e.deptname, manager from employee as e, dept as dp", // 1
"select name, empid, e.deptname, manager, score from employee as e, dept, score", // 2
"select p.deptname, sumtest(score) from dept as p, score group by p.deptName having sumtest(score) > 30", // 3
- "select p.deptname, score from dept as p, score order by score asc", // 4
+ "select p.deptname, score*200 from dept as p, score order by score*10 asc", // 4
"select name from employee where empId = 100", // 5
"select name, score from employee, score", // 6
"select p.deptName, sumtest(score) from dept as p, score group by p.deptName", // 7
"create table store1 as select p.deptName, sumtest(score) from dept as p, score group by p.deptName", // 8
"select deptName, sumtest(score) from score group by deptName having sumtest(score) > 30", // 9
"select 7 + 8 as res1, 8 * 9 as res2, 10 * 10 as res3", // 10
- "create index idx_employee on employee using bitmap (name null first, empId desc) with ('fillfactor' = 70)", // 11
+ "create index idx_employee on employee using bitmap_idx (name null first, empId desc) where empid > 100", // 11
"select name, score from employee, score order by score limit 3", // 12
"select length(name), length(deptname), *, empid+10 from employee where empId > 500", // 13
};
@@ -512,7 +511,7 @@ public class TestLogicalPlanner {
Schema expected = tpch.getOutSchema("q2");
assertSchema(expected, node.getOutSchema());
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.JOIN);
@@ -551,7 +550,7 @@ public class TestLogicalPlanner {
LogicalNode node = plan.getRootBlock().getRoot();
testJsonSerDerObject(node);
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -592,7 +591,7 @@ public class TestLogicalPlanner {
LogicalNode node = plan.getRootBlock().getRoot();
testJsonSerDerObject(node);
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
LogicalNode[] nodes = PlannerUtil.findAllNodes(node, NodeType.SCAN);
@@ -639,7 +638,7 @@ public class TestLogicalPlanner {
LogicalNode node = plan.getRootBlock().getRoot();
testJsonSerDerObject(node);
- LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration());
+ LogicalOptimizer optimizer = new LogicalOptimizer(util.getConfiguration(), catalog);
optimizer.optimize(plan);
Map<BinaryEval, Boolean> scanMap = TUtil.newHashMap();
@@ -896,6 +895,29 @@ public class TestLogicalPlanner {
}
@Test
+ public final void testCreateIndexNode() throws TajoException {
+ QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ Expr expr = sqlAnalyzer.parse(QUERIES[11]);
+ LogicalPlan rootNode = planner.createPlan(qc, expr);
+ LogicalNode plan = rootNode.getRootBlock().getRoot();
+ testJsonSerDerObject(plan);
+
+ LogicalRootNode root = (LogicalRootNode) plan;
+ assertEquals(NodeType.CREATE_INDEX, root.getChild().getType());
+ CreateIndexNode createIndexNode = root.getChild();
+
+ assertEquals(NodeType.PROJECTION, createIndexNode.getChild().getType());
+ ProjectionNode projNode = createIndexNode.getChild();
+
+ assertEquals(NodeType.SELECTION, projNode.getChild().getType());
+ SelectionNode selNode = projNode.getChild();
+
+ assertEquals(NodeType.SCAN, selNode.getChild().getType());
+ ScanNode scanNode = selNode.getChild();
+ assertEquals(CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "employee"), scanNode.getTableName());
+ }
+
+ @Test
public final void testAsterisk() throws CloneNotSupportedException, TajoException {
QueryContext qc = createQueryContext();
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
deleted file mode 100644
index 6fb7a45..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.Stack;
-
-import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
-import static org.junit.Assert.assertEquals;
-
-public class TestBSTIndexExec {
-
- private TajoConf conf;
- private Path idxPath;
- private CatalogService catalog;
- private SQLAnalyzer analyzer;
- private LogicalPlanner planner;
- private LogicalOptimizer optimizer;
- private FileTablespace sm;
- private Schema idxSchema;
- private BaseTupleComparator comp;
- private BSTIndex.BSTIndexWriter writer;
- private HashMap<Integer , Integer> randomValues ;
- private int rndKey = -1;
- private FileSystem fs;
- private TableMeta meta;
- private Path tablePath;
-
- private Random rnd = new Random(System.currentTimeMillis());
-
- private TajoTestingCluster util;
-
- @Before
- public void setup() throws Exception {
- this.randomValues = new HashMap<Integer, Integer>();
- this.conf = new TajoConf();
- conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
- util = new TajoTestingCluster();
- util.startCatalogCluster();
- catalog = util.getMiniCatalogCluster().getCatalog();
-
- Path workDir = CommonTestingUtil.getTestDir();
- catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
- catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
- sm = TablespaceManager.getLocalFs();
-
- idxPath = new Path(workDir, "test.idx");
-
- Schema schema = new Schema();
- schema.addColumn("managerid", Type.INT4);
- schema.addColumn("empid", Type.INT4);
- schema.addColumn("deptname", Type.TEXT);
-
- this.idxSchema = new Schema();
- idxSchema.addColumn("managerid", Type.INT4);
- SortSpec[] sortKeys = new SortSpec[1];
- sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false);
- this.comp = new BaseTupleComparator(idxSchema, sortKeys);
-
- this.writer = new BSTIndex(conf).getIndexWriter(idxPath,
- BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp);
- writer.setLoadNum(100);
- writer.open();
- long offset;
-
- meta = CatalogUtil.newTableMeta("TEXT");
- tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv");
- fs = tablePath.getFileSystem(conf);
- fs.mkdirs(tablePath.getParent());
-
- FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath);
- appender.init();
- VTuple tuple = new VTuple(schema.size());
- for (int i = 0; i < 10000; i++) {
-
- VTuple key = new VTuple(this.idxSchema.size());
- int rndKey = rnd.nextInt(250);
- if(this.randomValues.containsKey(rndKey)) {
- int t = this.randomValues.remove(rndKey) + 1;
- this.randomValues.put(rndKey, t);
- } else {
- this.randomValues.put(rndKey, 1);
- }
-
- key.put(new Datum[] { DatumFactory.createInt4(rndKey) });
- tuple.put(new Datum[] { DatumFactory.createInt4(rndKey),
- DatumFactory.createInt4(rnd.nextInt(10)),
- DatumFactory.createText("dept_" + rnd.nextInt(10)) });
- offset = appender.getOffset();
- appender.addTuple(tuple);
- writer.write(key, offset);
- }
- appender.flush();
- appender.close();
- writer.close();
-
- TableDesc desc = new TableDesc(
- CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta,
- tablePath.toUri());
- catalog.createTable(desc);
-
- analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(conf);
- }
-
- @After
- public void tearDown() {
- util.shutdownCatalogCluster();
- }
-
- @Test
- public void testEqual() throws Exception {
- this.rndKey = rnd.nextInt(250);
- final String QUERY = "select * from employee where managerId = " + rndKey;
-
- FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
- Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testEqual");
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
- LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
- Expr expr = analyzer.parse(QUERY);
- LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
- LogicalNode rootNode = optimizer.optimize(plan);
-
- TmpPlanner phyPlanner = new TmpPlanner(conf);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
- int tupleCount = this.randomValues.get(rndKey);
- int counter = 0;
- exec.init();
- while (exec.next() != null) {
- counter ++;
- }
- exec.close();
- assertEquals(tupleCount , counter);
- }
-
- private class TmpPlanner extends PhysicalPlannerImpl {
- public TmpPlanner(TajoConf conf) {
- super(conf);
- }
-
- @Override
- public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack<LogicalNode> stack)
- throws IOException {
- Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
- "Error: There is no table matched to %s", scanNode.getTableName());
-
- List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName()));
-
- Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
-
- return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
-
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/9840d378/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 292c414..b9ba2de 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -62,6 +62,7 @@ public class TestHashAntiJoinExec {
private LogicalPlanner planner;
private LogicalOptimizer optimizer;
private Path testDir;
+ private QueryContext queryContext;
private TableDesc employee;
private TableDesc people;
@@ -126,11 +127,12 @@ public class TestHashAntiJoinExec {
appender.flush();
appender.close();
+ queryContext = new QueryContext(conf);
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
- optimizer = new LogicalOptimizer(conf);
+ optimizer = new LogicalOptimizer(conf, catalog);
}
@After
@@ -157,7 +159,7 @@ public class TestHashAntiJoinExec {
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testHashAntiJoin");
- TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+ TaskAttemptContext ctx = new TaskAttemptContext(queryContext,
LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir);
ctx.setEnforcer(new Enforcer());
Expr expr = analyzer.parse(QUERIES[0]);
|