From commits-return-19399-archive-asf-public=cust-asf.ponee.io@phoenix.apache.org Thu Feb 8 08:24:23 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 9796618064F for ; Thu, 8 Feb 2018 08:24:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 865AB160C4A; Thu, 8 Feb 2018 07:24:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 339A9160C3D for ; Thu, 8 Feb 2018 08:24:21 +0100 (CET) Received: (qmail 24182 invoked by uid 500); 8 Feb 2018 07:24:20 -0000 Mailing-List: contact commits-help@phoenix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@phoenix.apache.org Delivered-To: mailing list commits@phoenix.apache.org Received: (qmail 24173 invoked by uid 99); 8 Feb 2018 07:24:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Feb 2018 07:24:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F1B84DF9FD; Thu, 8 Feb 2018 07:24:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jamestaylor@apache.org To: commits@phoenix.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: phoenix git commit: PHOENIX-3941 Filter regions to scan for local indexes based on data table leading pk filter conditions Date: Thu, 8 Feb 2018 07:24:19 +0000 (UTC) Repository: phoenix Updated Branches: refs/heads/4.x-HBase-0.98 76573d67b -> ea57ee008 PHOENIX-3941 Filter regions to scan for local indexes based on data table leading pk filter conditions Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ea57ee00 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ea57ee00 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ea57ee00 Branch: refs/heads/4.x-HBase-0.98 Commit: ea57ee0081050f01a0d13a16a415581eb9ea728a Parents: 76573d6 Author: James Taylor Authored: Wed Feb 7 23:02:44 2018 -0800 Committer: James Taylor Committed: Wed Feb 7 23:24:00 2018 -0800 ---------------------------------------------------------------------- .../apache/phoenix/compile/DeleteCompiler.java | 2 +- .../org/apache/phoenix/compile/ExplainPlan.java | 10 + .../apache/phoenix/compile/JoinCompiler.java | 10 +- .../apache/phoenix/compile/PostDDLCompiler.java | 2 +- .../apache/phoenix/compile/QueryCompiler.java | 32 ++- .../org/apache/phoenix/compile/ScanRanges.java | 12 +- .../apache/phoenix/compile/UpsertCompiler.java | 4 +- .../apache/phoenix/execute/AggregatePlan.java | 12 +- .../apache/phoenix/execute/BaseQueryPlan.java | 4 +- .../execute/LiteralResultIterationPlan.java | 2 +- .../org/apache/phoenix/execute/ScanPlan.java | 18 +- .../phoenix/iterate/BaseResultIterators.java | 226 ++++++++++++++++++- .../apache/phoenix/iterate/ExplainTable.java | 3 +- .../phoenix/iterate/ParallelIterators.java | 8 +- .../apache/phoenix/iterate/SerialIterators.java | 4 +- .../apache/phoenix/jdbc/PhoenixStatement.java | 2 +- .../apache/phoenix/optimize/QueryOptimizer.java | 4 +- .../query/ConnectionlessQueryServicesImpl.java | 8 +- .../phoenix/compile/QueryCompilerTest.java | 226 +++++++++++++++++++ .../apache/phoenix/query/KeyRangeClipTest.java | 155 +++++++++++++ .../query/ParallelIteratorsSplitTest.java | 2 +- 21 files changed, 680 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index fd80238..54e63d2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -595,7 +595,7 @@ public class DeleteCompiler { } final RowProjector projector = projectorToBe; final QueryPlan aggPlan = new AggregatePlan(context, select, dataPlan.getTableRef(), projector, null, null, - OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, dataPlan); return new ServerSelectDeleteMutationPlan(dataPlan, connection, aggPlan, projector, maxSize, maxSizeBytes); } else { final DeletingParallelIteratorFactory parallelIteratorFactory = parallelIteratorFactoryToBe; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java index 2bc7809..ef34daa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ExplainPlan.java @@ -34,4 +34,14 @@ public class ExplainPlan { public List getPlanSteps() { return planSteps; } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + for (String step : planSteps) { + buf.append(step); + buf.append('\n'); + } + return buf.toString(); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java index 887e2d2..f9d8711 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/JoinCompiler.java @@ -17,8 +17,8 @@ */ package org.apache.phoenix.compile; -import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import static org.apache.phoenix.schema.PTable.ImmutableStorageScheme.ONE_CELL_PER_COLUMN; +import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import java.sql.SQLException; import java.util.ArrayList; @@ -80,8 +80,6 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; -import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; import org.apache.phoenix.schema.PTableImpl; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.ProjectedColumn; @@ -1175,7 +1173,7 @@ public class JoinCompiler { } JoinTable join = compile(statement, select, resolver); if (groupByTableRef != null || orderByTableRef != null) { - QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false); + QueryCompiler compiler = new QueryCompiler(statement, select, resolver, false, null); List binds = statement.getParameters(); StatementContext ctx = new StatementContext(statement, resolver, new Scan(), new SequenceManager(statement)); QueryPlan plan = compiler.compileJoinQuery(ctx, binds, join, false, false, null); @@ -1197,6 +1195,10 @@ public class JoinCompiler { List groupBy = tableRef.equals(groupByTableRef) ? select.getGroupBy() : null; List orderBy = tableRef.equals(orderByTableRef) ? select.getOrderBy() : null; SelectStatement stmt = getSubqueryForOptimizedPlan(select.getHint(), table.getDynamicColumns(), table.getTableSamplingRate(), tableRef, join.getColumnRefs(), table.getPreFiltersCombined(), groupBy, orderBy, table.isWildCardSelect(), select.hasSequence(), select.getUdfParseNodes()); + // TODO: As port of PHOENIX-4585, we need to make sure this plan has a pointer to the data plan + // when an index is used instead of the data table, and that this method returns that + // state for downstream processing. + // TODO: It seems inefficient to be recompiling the statement again and again inside of this optimize call QueryPlan plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, stmt); if (!plan.getTableRef().equals(tableRef)) { replacement.put(tableRef, plan.getTableRef()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java index e5ed6a5..709534e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java @@ -283,7 +283,7 @@ public class PostDDLCompiler { continue; } QueryPlan plan = new AggregatePlan(context, select, tableRef, projector, null, null, - OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, null); try { ResultIterator iterator = plan.iterator(); try { http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java index 2258f28..9443110 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java @@ -104,16 +104,13 @@ public class QueryCompiler { private final boolean projectTuples; private final boolean useSortMergeJoin; private final boolean noChildParentJoinOptimization; + private final QueryPlan dataPlan; - public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver) throws SQLException { - this(statement, select, resolver, Collections.emptyList(), null, new SequenceManager(statement), true); + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples, QueryPlan dataPlan) throws SQLException { + this(statement, select, resolver, Collections.emptyList(), null, new SequenceManager(statement), projectTuples, dataPlan); } - public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, boolean projectTuples) throws SQLException { - this(statement, select, resolver, Collections.emptyList(), null, new SequenceManager(statement), projectTuples); - } - - public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples) throws SQLException { + public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager, boolean projectTuples, QueryPlan dataPlan) throws SQLException { this.statement = statement; this.select = select; this.resolver = resolver; @@ -133,10 +130,11 @@ public class QueryCompiler { scan.setCaching(statement.getFetchSize()); this.originalScan = ScanUtil.newScan(scan); + this.dataPlan = dataPlan; } public QueryCompiler(PhoenixStatement statement, SelectStatement select, ColumnResolver resolver, List targetColumns, ParallelIteratorFactory parallelIteratorFactory, SequenceManager sequenceManager) throws SQLException { - this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true); + this(statement, select, resolver, targetColumns, parallelIteratorFactory, sequenceManager, true, null); } /** @@ -495,7 +493,7 @@ public class QueryCompiler { } int maxRows = this.statement.getMaxRows(); this.statement.setMaxRows(pushDownMaxRows ? maxRows : 0); // overwrite maxRows to avoid its impact on inner queries. - QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false).compile(); + QueryPlan plan = new QueryCompiler(this.statement, subquery, resolver, false, dataPlan).compile(); plan = statement.getConnection().getQueryServices().getOptimizer().optimize(statement, plan); this.statement.setMaxRows(maxRows); // restore maxRows. return plan; @@ -581,14 +579,14 @@ public class QueryCompiler { QueryPlan plan = innerPlan; if (plan == null) { ParallelIteratorFactory parallelIteratorFactory = asSubquery ? null : this.parallelIteratorFactory; - plan = select.getFrom() == null - ? new LiteralResultIterationPlan(context, select, tableRef, projector, limit, offset, orderBy, - parallelIteratorFactory) - : (select.isAggregate() || select.isDistinct() - ? new AggregatePlan(context, select, tableRef, projector, limit, offset, orderBy, - parallelIteratorFactory, groupBy, having) - : new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy, - parallelIteratorFactory, allowPageFilter)); + plan = select.getFrom() == null + ? new LiteralResultIterationPlan(context, select, tableRef, projector, limit, offset, orderBy, + parallelIteratorFactory) + : (select.isAggregate() || select.isDistinct() + ? new AggregatePlan(context, select, tableRef, projector, limit, offset, orderBy, + parallelIteratorFactory, groupBy, having, dataPlan) + : new ScanPlan(context, select, tableRef, projector, limit, offset, orderBy, + parallelIteratorFactory, allowPageFilter, dataPlan)); } if (!subqueries.isEmpty()) { int count = subqueries.size(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index a34794b..0fcb6a7 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -147,8 +147,10 @@ public class ScanRanges { scanRange = KeyRange.getKeyRange(minKey, maxKey); } if (minMaxRange != KeyRange.EVERYTHING_RANGE) { - minMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable()); - scanRange = scanRange.intersect(minMaxRange); + // Intersect using modified min/max range, but keep original range to ensure it + // can still be decomposed into it's parts + KeyRange inclusiveExclusiveMinMaxRange = ScanUtil.convertToInclusiveExclusiveRange(minMaxRange, schema, new ImmutableBytesWritable()); + scanRange = scanRange.intersect(inclusiveExclusiveMinMaxRange); } if (scanRange == KeyRange.EMPTY_RANGE) { @@ -568,7 +570,7 @@ public class ScanRanges { } public int getBoundPkColumnCount() { - return this.useSkipScanFilter ? ScanUtil.getRowKeyPosition(slotSpan, ranges.size()) : Math.max(getBoundPkSpan(ranges, slotSpan), getBoundMinMaxSlotCount()); + return Math.max(getBoundPkSpan(ranges, slotSpan), getBoundMinMaxSlotCount()); } private int getBoundMinMaxSlotCount() { @@ -620,6 +622,10 @@ public class ScanRanges { public int[] getSlotSpans() { return slotSpan; } + + public KeyRange getScanRange() { + return scanRange; + } public boolean hasEqualityConstraint(int pkPosition) { int pkOffset = 0; http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 37b61ac..fcac335 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -548,7 +548,7 @@ public class UpsertCompiler { select = SelectStatement.create(select, hint); // Pass scan through if same table in upsert and select so that projection is computed correctly // Use optimizer to choose the best plan - QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false); + QueryCompiler compiler = new QueryCompiler(statement, select, selectResolver, targetColumns, parallelIteratorFactoryToBe, new SequenceManager(statement), false, null); queryPlanToBe = compiler.compile(); // This is post-fix: if the tableRef is a projected table, this means there are post-processing // steps and parallelIteratorFactory did not take effect. @@ -696,7 +696,7 @@ public class UpsertCompiler { scan.setAttribute(BaseScannerRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); // Ignore order by - it has no impact - final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null); + final QueryPlan aggPlan = new AggregatePlan(context, select, statementContext.getCurrentTable(), aggProjector, null,null, OrderBy.EMPTY_ORDER_BY, null, GroupBy.EMPTY_GROUP_BY, null, originalQueryPlan); return new ServerUpsertSelectMutationPlan(queryPlan, tableRef, originalQueryPlan, context, connection, scan, aggPlan, aggProjector, maxSize, maxSizeBytes); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java index 4c29abe..37e0c5a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java @@ -88,17 +88,17 @@ public class AggregatePlan extends BaseQueryPlan { public AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, - ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having) throws SQLException { + ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having, QueryPlan dataPlan) throws SQLException { this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, groupBy, having, - null); + null, dataPlan); } private AggregatePlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, GroupBy groupBy, Expression having, - Expression dynamicFilter) throws SQLException { + Expression dynamicFilter, QueryPlan dataPlan) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit, offset, - orderBy, groupBy, parallelIteratorFactory, dynamicFilter); + orderBy, groupBy, parallelIteratorFactory, dynamicFilter, dataPlan); this.having = having; this.aggregators = context.getAggregationManager().getAggregators(); boolean hasSerialHint = statement.getHint().hasHint(HintNode.Hint.SERIAL); @@ -223,8 +223,8 @@ public class AggregatePlan extends BaseQueryPlan { } } BaseResultIterators iterators = isSerial - ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches) - : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches); + ? new SerialIterators(this, null, null, wrapParallelIteratorFactory(), scanGrouper, scan, caches, dataPlan) + : new ParallelIterators(this, null, wrapParallelIteratorFactory(), scan, false, caches, dataPlan); estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); estimateInfoTimestamp = iterators.getEstimateInfoTimestamp(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java index d698bfa..d923d2f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java @@ -112,6 +112,7 @@ public abstract class BaseQueryPlan implements QueryPlan { * immediately before creating the ResultIterator. */ protected final Expression dynamicFilter; + protected final QueryPlan dataPlan; protected Long estimatedRows; protected Long estimatedSize; protected Long estimateInfoTimestamp; @@ -122,7 +123,7 @@ public abstract class BaseQueryPlan implements QueryPlan { StatementContext context, FilterableStatement statement, TableRef table, RowProjector projection, ParameterMetaData paramMetaData, Integer limit, Integer offset, OrderBy orderBy, GroupBy groupBy, ParallelIteratorFactory parallelIteratorFactory, - Expression dynamicFilter) { + Expression dynamicFilter, QueryPlan dataPlan) { this.context = context; this.statement = statement; this.tableRef = table; @@ -135,6 +136,7 @@ public abstract class BaseQueryPlan implements QueryPlan { this.groupBy = groupBy; this.parallelIteratorFactory = parallelIteratorFactory; this.dynamicFilter = dynamicFilter; + this.dataPlan = dataPlan; } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java index 86f59c5..4470947 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java @@ -55,7 +55,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan { public LiteralResultIterationPlan(Iterable tuples, StatementContext context, FilterableStatement statement, TableRef tableRef, RowProjector projection, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory) throws SQLException { - super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null); + super(context, statement, tableRef, projection, context.getBindManager().getParameterMetaData(), limit, offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory, null, null); this.tuples = tuples; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java index 1e1cb0d..af25bff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.ServerCacheClient.ServerCache; import org.apache.phoenix.compile.GroupByCompiler.GroupBy; import org.apache.phoenix.compile.OrderByCompiler.OrderBy; +import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.compile.RowProjector; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; @@ -91,14 +92,17 @@ public class ScanPlan extends BaseQueryPlan { private Long serialBytesEstimate; private Long serialEstimateInfoTs; - public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter) throws SQLException { - this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null); + public ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, + Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, + QueryPlan dataPlan) throws SQLException { + this(context, statement, table, projector, limit, offset, orderBy, parallelIteratorFactory, allowPageFilter, null, dataPlan); } - private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter) throws SQLException { + private ScanPlan(StatementContext context, FilterableStatement statement, TableRef table, RowProjector projector, Integer limit, Integer offset, + OrderBy orderBy, ParallelIteratorFactory parallelIteratorFactory, boolean allowPageFilter, Expression dynamicFilter, QueryPlan dataPlan) throws SQLException { super(context, statement, table, projector, context.getBindManager().getParameterMetaData(), limit,offset, orderBy, GroupBy.EMPTY_GROUP_BY, parallelIteratorFactory != null ? parallelIteratorFactory : - buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter); + buildResultIteratorFactory(context, statement, table, orderBy, limit, offset, allowPageFilter), dynamicFilter, dataPlan); this.allowPageFilter = allowPageFilter; boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty(); if (isOrdered) { // TopN @@ -235,11 +239,11 @@ public class ScanPlan extends BaseQueryPlan { && isDataToScanWithinThreshold; BaseResultIterators iterators; if (isOffsetOnServer) { - iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches); + iterators = new SerialIterators(this, perScanLimit, offset, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan); } else if (isSerial) { - iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches); + iterators = new SerialIterators(this, perScanLimit, null, parallelIteratorFactory, scanGrouper, scan, caches, dataPlan); } else { - iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches); + iterators = new ParallelIterators(this, perScanLimit, parallelIteratorFactory, scanGrouper, scan, initFirstScanOnly, caches, dataPlan); } estimatedRows = iterators.getEstimatedRowCount(); estimatedSize = iterators.getEstimatedByteCount(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index f9b90b4..311e206 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -37,6 +37,7 @@ import java.io.DataInputStream; import java.io.EOFException; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.Iterator; @@ -95,6 +96,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.ColumnFamilyNotFoundException; +import org.apache.phoenix.schema.PColumn; import org.apache.phoenix.schema.PColumnFamily; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.ImmutableStorageScheme; @@ -103,9 +105,11 @@ import org.apache.phoenix.schema.PTable.QualifierEncodingScheme; import org.apache.phoenix.schema.PTable.ViewType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.RowKeySchema; import org.apache.phoenix.schema.StaleRegionBoundaryCacheException; import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; +import org.apache.phoenix.schema.ValueSchema.Field; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.schema.stats.StatisticsUtil; @@ -159,6 +163,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private Scan scan; private final boolean useStatsForParallelization; protected Map caches; + private final QueryPlan dataPlan; static final Function TO_KEY_RANGE = new Function() { @Override @@ -475,13 +480,14 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } - public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map caches) throws SQLException { + public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer offset, ParallelScanGrouper scanGrouper, Scan scan, Map caches, QueryPlan dataPlan) throws SQLException { super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), plan.getOrderBy(), plan.getStatement().getHint(), QueryUtil.getOffsetLimit(plan.getLimit(), plan.getOffset()), offset); this.plan = plan; this.scan = scan; this.caches = caches; this.scanGrouper = scanGrouper; + this.dataPlan = dataPlan; StatementContext context = plan.getContext(); // Clone MutationState as the one on the connection will change if auto commit is on // yet we need the original one with the original transaction from TableResultIterator. @@ -683,6 +689,173 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private long rowsEstimate; } + private int computeColumnsInCommon() { + PTable dataTable; + if ((dataTable=dataPlan.getTableRef().getTable()).getBucketNum() != null) { // unable to compute prefix range for salted data table + return 0; + } + + PTable table = getTable(); + int nColumnsOffset = dataTable.isMultiTenant() ? 1 :0; + int nColumnsInCommon = nColumnsOffset; + List dataPKColumns = dataTable.getPKColumns(); + List indexPKColumns = table.getPKColumns(); + int nIndexPKColumns = indexPKColumns.size(); + int nDataPKColumns = dataPKColumns.size(); + // Skip INDEX_ID and tenant ID columns + for (int i = 1 + nColumnsInCommon; i < nIndexPKColumns; i++) { + PColumn indexColumn = indexPKColumns.get(i); + String indexColumnName = indexColumn.getName().getString(); + String cf = IndexUtil.getDataColumnFamilyName(indexColumnName); + if (cf.length() != 0) { + break; + } + if (i > nDataPKColumns) { + break; + } + PColumn dataColumn = dataPKColumns.get(i-1); + String dataColumnName = dataColumn.getName().getString(); + // Ensure both name and type are the same. Because of the restrictions we have + // on PK column types (namely that you can only have a fixed width nullable + // column as your last column), the type check is more of a sanity check + // since it wouldn't make sense to have an index with every column in common. + if (indexColumn.getDataType() == dataColumn.getDataType() + && dataColumnName.equals(IndexUtil.getDataColumnName(indexColumnName))) { + nColumnsInCommon++; + continue; + } + break; + } + return nColumnsInCommon; + } + + // public for testing + public static ScanRanges computePrefixScanRanges(ScanRanges dataScanRanges, int nColumnsInCommon) { + if (nColumnsInCommon == 0) { + return ScanRanges.EVERYTHING; + } + + int offset = 0; + List> cnf = Lists.newArrayListWithExpectedSize(nColumnsInCommon); + int[] slotSpan = new int[nColumnsInCommon]; + boolean useSkipScan = false; + boolean hasRange = false; + List> rangesList = dataScanRanges.getRanges(); + int rangesListSize = rangesList.size(); + while (offset < nColumnsInCommon && offset < rangesListSize) { + List ranges = rangesList.get(offset); + // We use a skip scan if we have multiple ranges or if + // we have a non single key range before the last range. + useSkipScan |= ranges.size() > 1 || hasRange; + cnf.add(ranges); + int rangeSpan = 1 + dataScanRanges.getSlotSpans()[offset]; + if (offset + rangeSpan > nColumnsInCommon) { + rangeSpan = nColumnsInCommon - offset; + // trim range to only be rangeSpan in length + ranges = Lists.newArrayListWithExpectedSize(cnf.get(cnf.size()-1).size()); + for (KeyRange range : cnf.get(cnf.size()-1)) { + range = clipRange(dataScanRanges.getSchema(), offset, rangeSpan, range); + // trim range to be only rangeSpan in length + ranges.add(range); + } + cnf.set(cnf.size()-1, ranges); + } + for (KeyRange range : ranges) { + if (!range.isSingleKey()) { + hasRange = true; + } + } + slotSpan[offset] = rangeSpan - 1; + offset = offset + rangeSpan; + } + useSkipScan &= dataScanRanges.useSkipScanFilter(); + KeyRange minMaxRange = + clipRange(dataScanRanges.getSchema(), 0, nColumnsInCommon, dataScanRanges.getMinMaxRange()); + slotSpan = slotSpan.length == cnf.size() ? slotSpan : Arrays.copyOf(slotSpan, cnf.size()); + ScanRanges commonScanRanges = ScanRanges.create(dataScanRanges.getSchema(), cnf, slotSpan, minMaxRange, null, useSkipScan, -1); + return commonScanRanges; + } + + /** + * Truncates range to be a max of rangeSpan fields + * @param schema row key schema + * @param fieldIndex starting index of field with in the row key schema + * @param rangeSpan maximum field length + * @return the same range if unchanged and otherwise a new range + */ + public static KeyRange clipRange(RowKeySchema schema, int fieldIndex, int rangeSpan, KeyRange range) { + if (range == KeyRange.EVERYTHING_RANGE) { + return range; + } + if (range == KeyRange.EMPTY_RANGE) { + return range; + } + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); + boolean newRange = false; + boolean lowerUnbound = range.lowerUnbound(); + boolean lowerInclusive = range.isLowerInclusive(); + byte[] lowerRange = range.getLowerRange(); + if (!lowerUnbound && lowerRange.length > 0) { + if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, lowerRange, ptr, true)) { + // Make lower range inclusive since we're decreasing the range by chopping the last part off + lowerInclusive = true; + lowerRange = ptr.copyBytes(); + newRange = true; + } + } + boolean upperUnbound = range.upperUnbound(); + boolean upperInclusive = range.isUpperInclusive(); + byte[] upperRange = range.getUpperRange(); + if (!upperUnbound && upperRange.length > 0) { + if (clipKeyRangeBytes(schema, fieldIndex, rangeSpan, upperRange, ptr, false)) { + // Make lower range inclusive since we're decreasing the range by chopping the last part off + upperInclusive = true; + upperRange = ptr.copyBytes(); + newRange = true; + } + } + + return newRange ? KeyRange.getKeyRange(lowerRange, lowerInclusive, upperRange, upperInclusive) : range; + } + + private static boolean clipKeyRangeBytes(RowKeySchema schema, int fieldIndex, int rangeSpan, byte[] rowKey, ImmutableBytesWritable ptr, boolean trimTrailingNulls) { + int position = 0; + int maxOffset = schema.iterator(rowKey, ptr); + byte[] newRowKey = new byte[rowKey.length]; + int offset = 0; + int trailingNullsToTrim = 0; + do { + if (schema.next(ptr, fieldIndex, maxOffset) == null) { + break; + } + System.arraycopy(ptr.get(), ptr.getOffset(), newRowKey, offset, ptr.getLength()); + offset += ptr.getLength(); + Field field = schema.getField(fieldIndex); + if (field.getDataType().isFixedWidth()) { + trailingNullsToTrim = 0; + } else { + boolean isNull = ptr.getLength() == 0; + byte sepByte = SchemaUtil.getSeparatorByte(true, isNull, field); + newRowKey[offset++] = sepByte; + if (isNull) { + if (trimTrailingNulls) { + trailingNullsToTrim++; + } else { + trailingNullsToTrim = 0; + } + } else { + // So that last zero separator byte is always trimmed + trailingNullsToTrim = 1; + } + } + fieldIndex++; + } while (++position < rangeSpan); + // remove trailing nulls + ptr.set(newRowKey, 0, offset - trailingNullsToTrim); + // return true if we've clipped the rowKey + return maxOffset != offset; + } + /** * Compute the list of parallel scans to run for a given query. The inner scans * may be concatenated together directly, while the other ones may need to be @@ -704,26 +877,43 @@ public abstract class BaseResultIterators extends ExplainTable implements Result // case we generate an empty guide post with the byte estimate being set as guide post // width. boolean emptyGuidePost = gps.isEmptyGuidePost(); + byte[] startRegionBoundaryKey = startKey; + byte[] stopRegionBoundaryKey = stopKey; + int columnsInCommon = 0; + ScanRanges prefixScanRanges = ScanRanges.EVERYTHING; boolean traverseAllRegions = isSalted || isLocalIndex; - if (!traverseAllRegions) { + if (isLocalIndex) { + // TODO: when implementing PHOENIX-4585, we should change this to an assert + // as we should always have a data plan when a local index is being used. + if (dataPlan != null && dataPlan.getTableRef().getTable().getType() != PTableType.INDEX) { // Sanity check + prefixScanRanges = computePrefixScanRanges(dataPlan.getContext().getScanRanges(), columnsInCommon=computeColumnsInCommon()); + KeyRange prefixRange = prefixScanRanges.getScanRange(); + if (!prefixRange.lowerUnbound()) { + startRegionBoundaryKey = prefixRange.getLowerRange(); + } + if (!prefixRange.upperUnbound()) { + stopRegionBoundaryKey = prefixRange.getUpperRange(); + } + } + } else if (!traverseAllRegions) { byte[] scanStartRow = scan.getStartRow(); if (scanStartRow.length != 0 && Bytes.compareTo(scanStartRow, startKey) > 0) { - startKey = scanStartRow; + startRegionBoundaryKey = startKey = scanStartRow; } byte[] scanStopRow = scan.getStopRow(); if (stopKey.length == 0 || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) { - stopKey = scanStopRow; + stopRegionBoundaryKey = stopKey = scanStopRow; } } int regionIndex = 0; int stopIndex = regionBoundaries.size(); - if (startKey.length > 0) { - regionIndex = getIndexContainingInclusive(regionBoundaries, startKey); + if (startRegionBoundaryKey.length > 0) { + regionIndex = getIndexContainingInclusive(regionBoundaries, startRegionBoundaryKey); } - if (stopKey.length > 0) { - stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopKey)); + if (stopRegionBoundaryKey.length > 0) { + stopIndex = Math.min(stopIndex, regionIndex + getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), stopRegionBoundaryKey)); if (isLocalIndex) { stopKey = regionLocations.get(stopIndex).getRegionInfo().getEndKey(); } @@ -773,15 +963,29 @@ public abstract class BaseResultIterators extends ExplainTable implements Result HRegionLocation regionLocation = regionLocations.get(regionIndex); HRegionInfo regionInfo = regionLocation.getRegionInfo(); byte[] currentGuidePostBytes = currentGuidePost.copyBytes(); - byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY; + byte[] endKey; if (regionIndex == stopIndex) { endKey = stopKey; } else { endKey = regionBoundaries.get(regionIndex); } if (isLocalIndex) { - endRegionKey = regionInfo.getEndKey(); - keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey); + // Only attempt further pruning if the prefix range is using + // a skip scan since we've already pruned the range of regions + // based on the start/stop key. + if (columnsInCommon > 0 && prefixScanRanges.useSkipScanFilter()) { + byte[] regionStartKey = regionInfo.getStartKey(); + ImmutableBytesWritable ptr = context.getTempPtr(); + clipKeyRangeBytes(prefixScanRanges.getSchema(), 0, columnsInCommon, regionStartKey, ptr, false); + regionStartKey = ByteUtil.copyKeyBytesIfNecessary(ptr); + // Prune this region if there's no intersection + if (!prefixScanRanges.intersectRegion(regionStartKey, regionInfo.getEndKey(), false)) { + currentKeyBytes = endKey; + regionIndex++; + continue; + } + } + keyOffset = ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), regionInfo.getEndKey()); } byte[] initialKeyBytes = currentKeyBytes; while (intersectWithGuidePosts && (endKey.length == 0 || currentGuidePost.compareTo(endKey) <= 0)) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 1c1d008..2c91d47 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -261,7 +261,8 @@ public abstract class ExplainTable { if (minMaxRange != KeyRange.EVERYTHING_RANGE) { RowKeySchema schema = tableRef.getTable().getRowKeySchema(); if (!minMaxRange.isUnbound(bound)) { - minMaxIterator = new RowKeyValueIterator(schema, minMaxRange.getRange(bound)); + // Use scan ranges from ScanRanges since it will have been intersected with minMaxRange + minMaxIterator = new RowKeyValueIterator(schema, scanRanges.getScanRange().getRange(bound)); } } boolean isLocalIndex = ScanUtil.isLocalIndex(context.getScan()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java index f5d226f..1845bfe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java @@ -58,16 +58,16 @@ public class ParallelIterators extends BaseResultIterators { private final ParallelIteratorFactory iteratorFactory; private final boolean initFirstScanOnly; - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map caches) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, boolean initFirstScanOnly, Map caches, QueryPlan dataPlan) throws SQLException { - super(plan, perScanLimit, null, scanGrouper, scan,caches); + super(plan, perScanLimit, null, scanGrouper, scan,caches, dataPlan); this.iteratorFactory = iteratorFactory; this.initFirstScanOnly = initFirstScanOnly; } - public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map caches) + public ParallelIterators(QueryPlan plan, Integer perScanLimit, ParallelIteratorFactory iteratorFactory, Scan scan, boolean initOneScanPerRegion, Map caches, QueryPlan dataPlan) throws SQLException { - this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches); + this(plan, perScanLimit, iteratorFactory, DefaultParallelScanGrouper.getInstance(), scan, initOneScanPerRegion, caches, dataPlan); } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java index b13749c..c307a2b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java @@ -62,9 +62,9 @@ public class SerialIterators extends BaseResultIterators { private final Integer offset; public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer offset, - ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map caches) + ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan scan, Map caches, QueryPlan dataPlan) throws SQLException { - super(plan, perScanLimit, offset, scanGrouper, scan, caches); + super(plan, perScanLimit, offset, scanGrouper, scan, caches, dataPlan); this.offset = offset; // must be a offset or a limit specified or a SERIAL hint Preconditions.checkArgument( http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index 1fd15df..f418bc9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -472,7 +472,7 @@ public class PhoenixStatement implements Statement, SQLCloseable { select = StatementNormalizer.normalize(transformedSelect, resolver); } - QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile(); + QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true, null).compile(); plan.getContext().getSequenceManager().validateSequences(seqAction); return plan; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index b3c7ed0..028fc94 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -240,7 +240,7 @@ public class QueryOptimizer { try { // translate nodes that match expressions that are indexed to the associated column parse node indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes())); - QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected); + QueryCompiler compiler = new QueryCompiler(statement, indexSelect, resolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan); QueryPlan plan = compiler.compile(); // If query doesn't have where clause and some of columns to project are missing @@ -312,7 +312,7 @@ public class QueryOptimizer { query = SubqueryRewriter.transform(query, queryResolver, statement.getConnection()); queryResolver = FromCompiler.getResolverForQuery(query, statement.getConnection()); query = StatementNormalizer.normalize(query, queryResolver); - QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected).compile(); + QueryPlan plan = new QueryCompiler(statement, query, queryResolver, targetColumns, parallelIteratorFactory, dataPlan.getContext().getSequenceManager(), isProjected, dataPlan).compile(); return plan; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index 4ffbb23..7005790 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -81,6 +81,7 @@ import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.stats.GuidePostsInfo; import org.apache.phoenix.schema.stats.GuidePostsKey; import org.apache.phoenix.transaction.TransactionFactory; +import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.JDBCUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -237,7 +238,12 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public MetaDataMutationResult createTable(List tableMetaData, byte[] physicalName, PTableType tableType, Map tableProps, List>> families, byte[][] splits, boolean isNamespaceMapped, boolean allocateIndexId) throws SQLException { - if (splits != null) { + if (tableType == PTableType.INDEX && IndexUtil.isLocalIndexFamily(Bytes.toString(families.iterator().next().getFirst()))) { + Object dataTableName = tableProps.get(PhoenixDatabaseMetaData.DATA_TABLE_NAME); + List regionLocations = tableSplits.get(dataTableName); + byte[] tableName = getTableName(tableMetaData, physicalName); + tableSplits.put(Bytes.toString(tableName), regionLocations); + } else if (splits != null) { byte[] tableName = getTableName(tableMetaData, physicalName); tableSplits.put(Bytes.toString(tableName), generateRegionLocations(tableName, splits)); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java index 5a672ba..1d61003 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java @@ -4161,4 +4161,230 @@ public class QueryCompilerTest extends BaseConnectionlessQueryTest { assertEquals(e.getErrorCode(), SQLExceptionCode.CONNECTION_CLOSED.getErrorCode()); } } + + @Test + public void testSingleColLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,C)"); + String query = "SELECT * FROM T WHERE A = 'B' and C='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("A", Bytes.toString(scan.getStartRow()).trim()); + assertEquals("C", Bytes.toString(scan.getStopRow()).trim()); + } + } + + @Test + public void testMultiColLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A = 'C' and B = 'X' and D='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("C", Bytes.toString(scan.getStartRow()).trim()); + assertEquals("E", Bytes.toString(scan.getStopRow()).trim()); + } + } + + @Test + public void testSkipScanLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A IN ('A','G') and B = 'A' and D = 'D'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List> outerScans = plan.getScans(); + assertEquals(2, outerScans.size()); + List innerScans1 = outerScans.get(0); + assertEquals(1, innerScans1.size()); + Scan scan1 = innerScans1.get(0); + assertEquals("A", Bytes.toString(scan1.getStartRow()).trim()); + assertEquals("C", Bytes.toString(scan1.getStopRow()).trim()); + List innerScans2 = outerScans.get(1); + assertEquals(1, innerScans2.size()); + Scan scan2 = innerScans2.get(0); + assertEquals("G", Bytes.toString(scan2.getStartRow()).trim()); + assertEquals("I", Bytes.toString(scan2.getStopRow()).trim()); + } + } + + @Test + public void testRVCLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A='I' and (B,D) IN (('A','D'),('B','I'))"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("I", Bytes.toString(scan.getStartRow()).trim()); + assertEquals(0, scan.getStopRow().length); + } + } + + @Test + public void testRVCLocalIndexPruning2() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B VARCHAR,\n" + + " C VARCHAR,\n" + + " D VARCHAR,\n" + + " E VARCHAR,\n" + + " F VARCHAR,\n" + + " G VARCHAR,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D,\n" + + " E,\n" + + " F,\n" + + " G\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,C,F,G)"); + String query = "SELECT * FROM T WHERE (A,B,C,D) IN (('I','D','F','X'),('I','I','G','Y')) and F='X' and G='Y'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("I", Bytes.toString(scan.getStartRow()).trim()); + assertEquals(0, scan.getStopRow().length); + } + } + + @Test + public void testMinMaxRangeLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " D CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C,\n" + + " D\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(A,B,D)"); + String query = "SELECT * FROM T WHERE A = 'C' and (A,B,D) > ('C','B','X') and D='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List> outerScans = plan.getScans(); + assertEquals(1, outerScans.size()); + List innerScans = outerScans.get(0); + assertEquals(1, innerScans.size()); + Scan scan = innerScans.get(0); + assertEquals("C", Bytes.toString(scan.getStartRow()).trim()); + assertEquals("E", Bytes.toString(scan.getStopRow()).trim()); + } + } + + @Test + public void testNoLocalIndexPruning() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + conn.createStatement().execute("CREATE TABLE T (\n" + + " A CHAR(1) NOT NULL,\n" + + " B CHAR(1) NOT NULL,\n" + + " C CHAR(1) NOT NULL,\n" + + " CONSTRAINT PK PRIMARY KEY (\n" + + " A,\n" + + " B,\n" + + " C\n" + + " )\n" + + ") SPLIT ON ('A','C','E','G','I')"); + conn.createStatement().execute("CREATE LOCAL INDEX IDX ON T(C)"); + String query = "SELECT * FROM T WHERE C='C'"; + PhoenixStatement statement = conn.createStatement().unwrap(PhoenixStatement.class); + QueryPlan plan = statement.optimizeQuery(query); + assertEquals("IDX", plan.getContext().getCurrentTable().getTable().getName().getString()); + plan.iterator(); + List> outerScans = plan.getScans(); + assertEquals(6, outerScans.size()); + } + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java new file mode 100644 index 0000000..abc435a --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/KeyRangeClipTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.query; + +import static org.apache.phoenix.query.KeyRange.UNBOUND; +import static org.apache.phoenix.query.QueryConstants.DESC_SEPARATOR_BYTE_ARRAY; +import static org.apache.phoenix.query.QueryConstants.SEPARATOR_BYTE_ARRAY; +import static org.junit.Assert.assertEquals; + +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.compile.ScanRanges; +import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.iterate.BaseResultIterators; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PLong; +import org.apache.phoenix.schema.types.PSmallint; +import org.apache.phoenix.schema.types.PVarchar; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import com.google.common.collect.Lists; + + +/** + * Test for intersect method in {@link SkipScanFilter} + */ +@RunWith(Parameterized.class) +public class KeyRangeClipTest extends BaseConnectionlessQueryTest { + private final RowKeySchema schema; + private final KeyRange input; + private final KeyRange expectedOutput; + private final int clipTo; + + private static byte[] getRange(PhoenixConnection pconn, List startValues) throws SQLException { + byte[] lowerRange; + if (startValues == null) { + lowerRange = KeyRange.UNBOUND; + } else { + String upsertValues = StringUtils.repeat("?,", startValues.size()).substring(0,startValues.size() * 2 - 1); + String upsertStmt = "UPSERT INTO T VALUES(" + upsertValues + ")"; + PreparedStatement stmt = pconn.prepareStatement(upsertStmt); + for (int i = 0; i < startValues.size(); i++) { + stmt.setObject(i+1, startValues.get(i)); + } + stmt.execute(); + Cell startCell = PhoenixRuntime.getUncommittedDataIterator(pconn).next().getSecond().get(0); + lowerRange = CellUtil.cloneRow(startCell); + pconn.rollback(); + } + return lowerRange; + } + + public KeyRangeClipTest(String tableDef, List startValues, List endValues, int clipTo, KeyRange expectedOutput) throws SQLException { + PhoenixConnection pconn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class); + pconn.createStatement().execute("CREATE TABLE T(" + tableDef+ ")"); + PTable table = pconn.getMetaDataCache().getTableRef(new PTableKey(null,"T")).getTable(); + this.schema = table.getRowKeySchema(); + byte[] lowerRange = getRange(pconn, startValues); + byte[] upperRange = getRange(pconn, endValues); + this.input = KeyRange.getKeyRange(lowerRange, upperRange); + this.expectedOutput = expectedOutput; + this.clipTo = clipTo; + } + + @After + public void cleanup() throws SQLException { + PhoenixConnection pconn = DriverManager.getConnection(getUrl()).unwrap(PhoenixConnection.class); + pconn.createStatement().execute("DROP TABLE T"); + } + + @Test + public void test() { + ScanRanges scanRanges = ScanRanges.create(schema, Collections.>singletonList(Collections.singletonList(input)), new int[] {schema.getFieldCount()-1}, KeyRange.EVERYTHING_RANGE, null, false, -1); + ScanRanges clippedRange = BaseResultIterators.computePrefixScanRanges(scanRanges, clipTo); + assertEquals(expectedOutput, clippedRange.getScanRange()); + } + + @Parameters(name="KeyRangeClipTest_{0}") + public static Collection data() { + List testCases = Lists.newArrayList(); + testCases.add(Lists.newArrayList( // [XY - *] + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C)", + Lists.newArrayList("XY",null,"Z"), null, 2, + KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C)", + null, Lists.newArrayList("XY",null,"Z"), 2, + KeyRange.getKeyRange( + ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower + ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)), false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)", + Lists.newArrayList("XY",null,null,"Z"), null, 3, + KeyRange.getKeyRange(Bytes.toBytes("XY"), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)", + null, Lists.newArrayList("XY",null,null,"Z"), 3, + KeyRange.getKeyRange( + ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower + ByteUtil.nextKey(ByteUtil.concat(Bytes.toBytes("XY"),SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY)), false)).toArray()); + testCases.add(Lists.newArrayList( + "A CHAR(1) NOT NULL, B CHAR(1) NOT NULL, C CHAR(1) NOT NULL, CONSTRAINT PK PRIMARY KEY (A,B,C)", + Lists.newArrayList("A","B","C"), Lists.newArrayList("C","D","E"), 2, + KeyRange.getKeyRange(Bytes.toBytes("AB"), true, ByteUtil.nextKey(Bytes.toBytes("CD")), false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C SMALLINT NOT NULL, D VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B,C,D)", + Lists.newArrayList("XY",null,1,"Z"), null, 3, + KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XY"), SEPARATOR_BYTE_ARRAY, SEPARATOR_BYTE_ARRAY, PSmallint.INSTANCE.toBytes(1)), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B BIGINT NOT NULL, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A,B DESC,C)", + Lists.newArrayList("XYZ",1,"Z"), null, 2, + KeyRange.getKeyRange(ByteUtil.concat(Bytes.toBytes("XYZ"), SEPARATOR_BYTE_ARRAY, PLong.INSTANCE.toBytes(1, SortOrder.DESC)), true, UNBOUND, false)).toArray()); + testCases.add(Lists.newArrayList( + "A VARCHAR NOT NULL, B VARCHAR, C VARCHAR, CONSTRAINT PK PRIMARY KEY (A DESC,B,C)", + null, Lists.newArrayList("XY",null,"Z"), 3, + KeyRange.getKeyRange( + ByteUtil.nextKey(SEPARATOR_BYTE_ARRAY), true, // skips null values for unbound lower + (ByteUtil.concat(PVarchar.INSTANCE.toBytes("XY",SortOrder.DESC),DESC_SEPARATOR_BYTE_ARRAY,SEPARATOR_BYTE_ARRAY,Bytes.toBytes("Z"))), false)).toArray()); + return testCases; + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea57ee00/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java index 935d8cb..cb34d2b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java @@ -487,7 +487,7 @@ public class ParallelIteratorsSplitTest extends BaseConnectionlessQueryTest { return null; } - }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null); + }, null, new SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()), context.getScan(), false, null, null); List keyRanges = parallelIterators.getSplits(); return keyRanges; }