Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D16F12004F1 for ; Wed, 30 Aug 2017 18:38:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CF9EB1697C6; Wed, 30 Aug 2017 16:38:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B04371697BB for ; Wed, 30 Aug 2017 18:38:05 +0200 (CEST) Received: (qmail 96966 invoked by uid 500); 30 Aug 2017 16:38:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 96506 invoked by uid 99); 30 Aug 2017 16:38:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Aug 2017 16:38:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 89970F5579; Wed, 30 Aug 2017 16:38:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aleksey@apache.org To: commits@cassandra.apache.org Date: Wed, 30 Aug 2017 16:38:02 -0000 Message-Id: In-Reply-To: <2062b319bb594f5cbdd646aaa0678cac@git.apache.org> References: <2062b319bb594f5cbdd646aaa0678cac@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [02/10] cassandra git commit: Fix race condition in read command serialization archived-at: Wed, 30 Aug 2017 16:38:08 -0000 Fix race condition in read command serialization patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for CASSANDRA-13363 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7f297bcf Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7f297bcf Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7f297bcf Branch: refs/heads/cassandra-3.11 Commit: 7f297bcf8aced983cbc9c4103d0ebefc1789f0dd Parents: d03c046 Author: Aleksey Yeschenko Authored: Mon Aug 14 16:43:06 2017 +0100 Committer: Aleksey Yeschenko Committed: Wed Aug 30 16:16:46 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 +- .../cql3/statements/SelectStatement.java | 16 +- .../db/AbstractReadCommandBuilder.java | 2 +- .../cassandra/db/PartitionRangeReadCommand.java | 133 +++++++++++--- .../org/apache/cassandra/db/ReadCommand.java | 149 ++++++++------- .../db/SinglePartitionReadCommand.java | 180 ++++++++++++++++--- .../cassandra/index/SecondaryIndexManager.java | 9 +- .../internal/composites/CompositesSearcher.java | 6 +- .../index/internal/keys/KeysSearcher.java | 3 +- .../cassandra/service/AbstractReadExecutor.java | 4 +- .../service/pager/PartitionRangeQueryPager.java | 8 +- .../cassandra/thrift/CassandraServer.java | 69 ++++--- test/unit/org/apache/cassandra/Util.java | 26 +-- .../apache/cassandra/db/SecondaryIndexTest.java | 10 +- .../db/SinglePartitionSliceCommandTest.java | 45 ++--- .../cassandra/io/sstable/SSTableReaderTest.java | 2 +- 16 files changed, 427 insertions(+), 238 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 452dc9b..aca9e1f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 3.0.15 - * enable segement creation before recovering commitlogs (CASSANDRA-13587) + * Fix race condition in read command serialization (CASSANDRA-13363) + * Enable segement creation before recovering commitlogs (CASSANDRA-13587) * Fix AssertionError in short read protection (CASSANDRA-13747) * Don't skip corrupted sstables on startup (CASSANDRA-13620) * Fix the merging of cells with different user type versions (CASSANDRA-13776) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index bd377f4..3882a23 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -540,18 +540,10 @@ public class SelectStatement implements CQLStatement if (keyBounds == null) return ReadQuery.EMPTY; - PartitionRangeReadCommand command = new PartitionRangeReadCommand(cfm, - nowInSec, - queriedColumns, - rowFilter, - limit, - new DataRange(keyBounds, clusteringIndexFilter), - Optional.empty()); - // If there's a secondary index that the command can use, have it validate - // the request parameters. Note that as a side effect, if a viable Index is - // identified by the CFS's index manager, it will be cached in the command - // and serialized during distribution to replicas in order to avoid performing - // further lookups. + PartitionRangeReadCommand command = + PartitionRangeReadCommand.create(false, cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter)); + + // If there's a secondary index that the command can use, have it validate the request parameters. command.maybeValidateIndex(); return command; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java index afbab74..d219816 100644 --- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java +++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java @@ -336,7 +336,7 @@ public abstract class AbstractReadCommandBuilder else bounds = new ExcludingBounds<>(start, end); - return new PartitionRangeReadCommand(cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty()); + return PartitionRangeReadCommand.create(false, cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter())); } static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index fb2dd0d..9e557e0 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -20,8 +20,8 @@ package org.apache.cassandra.db; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Optional; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import org.apache.cassandra.config.CFMetaData; @@ -59,31 +59,39 @@ public class PartitionRangeReadCommand extends ReadCommand private final DataRange dataRange; private int oldestUnrepairedTombstone = Integer.MAX_VALUE; - public PartitionRangeReadCommand(boolean isDigest, - int digestVersion, - boolean isForThrift, - CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DataRange dataRange, - Optional index) + private PartitionRangeReadCommand(boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange, + IndexMetadata index) { - super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); this.dataRange = dataRange; - this.index = index; } - public PartitionRangeReadCommand(CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DataRange dataRange, - Optional index) + public static PartitionRangeReadCommand create(boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DataRange dataRange) { - this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index); + return new PartitionRangeReadCommand(false, + 0, + isForThrift, + metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + dataRange, + findIndex(metadata, rowFilter)); } /** @@ -96,13 +104,14 @@ public class PartitionRangeReadCommand extends ReadCommand */ public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec) { - return new PartitionRangeReadCommand(metadata, + return new PartitionRangeReadCommand(false, 0, false, + metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, DataRange.allData(metadata.partitioner), - Optional.empty()); + null); } public DataRange dataRange() @@ -122,17 +131,72 @@ public class PartitionRangeReadCommand extends ReadCommand public PartitionRangeReadCommand forSubRange(AbstractBounds range) { - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange().forSubRange(range), + indexMetadata()); } public PartitionRangeReadCommand copy() { - return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange(), + indexMetadata()); + } + + public PartitionRangeReadCommand copyAsDigestQuery() + { + return new PartitionRangeReadCommand(true, + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + dataRange(), + indexMetadata()); + } + + public PartitionRangeReadCommand withUpdatedDataRange(DataRange newDataRange) + { + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + newDataRange, + indexMetadata()); } - public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) + public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange) { - return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index); + return new PartitionRangeReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + newLimits, + newDataRange, + indexMetadata()); } public long getTimeout() @@ -173,7 +237,8 @@ public class PartitionRangeReadCommand extends ReadCommand metric.rangeLatency.addNano(latencyNanos); } - protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) + @VisibleForTesting + public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) { ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange())); Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator())); @@ -337,7 +402,17 @@ public class PartitionRangeReadCommand extends ReadCommand private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional index) + public ReadCommand deserialize(DataInputPlus in, + int version, + boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + IndexMetadata index) throws IOException { DataRange range = DataRange.serializer.deserialize(in, version, metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 76180cc..66985b6 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -22,6 +22,8 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.function.Predicate; +import javax.annotation.Nullable; + import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,24 +108,27 @@ public abstract class ReadCommand implements ReadQuery private final RowFilter rowFilter; private final DataLimits limits; - // SecondaryIndexManager will attempt to provide the most selective of any available indexes - // during execution. Here we also store an the results of that lookup to repeating it over - // the lifetime of the command. - protected Optional index = Optional.empty(); - - // Flag to indicate whether the index manager has been queried to select an index for this - // command. This is necessary as the result of that lookup may be null, in which case we - // still don't want to repeat it. - private boolean indexManagerQueried = false; - - private boolean isDigestQuery; + private final boolean isDigestQuery; // if a digest query, the version for which the digest is expected. Ignored if not a digest. private int digestVersion; private final boolean isForThrift; + @Nullable + private final IndexMetadata index; + protected static abstract class SelectionDeserializer { - public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional index) throws IOException; + public abstract ReadCommand deserialize(DataInputPlus in, + int version, + boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + IndexMetadata index) throws IOException; } protected enum Kind @@ -147,7 +152,8 @@ public abstract class ReadCommand implements ReadQuery int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, - DataLimits limits) + DataLimits limits, + IndexMetadata index) { this.kind = kind; this.isDigestQuery = isDigestQuery; @@ -158,6 +164,7 @@ public abstract class ReadCommand implements ReadQuery this.columnFilter = columnFilter; this.rowFilter = rowFilter; this.limits = limits; + this.index = index; } protected abstract void serializeSelection(DataOutputPlus out, int version) throws IOException; @@ -253,18 +260,6 @@ public abstract class ReadCommand implements ReadQuery } /** - * Sets whether this command should be a digest one or not. - * - * @param isDigestQuery whether the command should be set as a digest one or not. - * @return this read command. - */ - public ReadCommand setIsDigestQuery(boolean isDigestQuery) - { - this.isDigestQuery = isDigestQuery; - return this; - } - - /** * Sets the digest version, for when digest for that command is requested. *

* Note that we allow setting this independently of setting the command as a digest query as @@ -291,6 +286,30 @@ public abstract class ReadCommand implements ReadQuery } /** + * Index (metadata) chosen for this query. Can be null. + * + * @return index (metadata) chosen for this query + */ + @Nullable + public IndexMetadata indexMetadata() + { + return index; + } + + /** + * Index instance chosen for this query. Can be null. + * + * @return Index instance chosen for this query. Can be null. + */ + @Nullable + public Index index() + { + return null == index + ? null + : Keyspace.openAndGetStore(metadata).indexManager.getIndex(index); + } + + /** * The clustering index filter this command to use for the provided key. *

* Note that that method should only be called on a key actually queried by this command @@ -310,6 +329,11 @@ public abstract class ReadCommand implements ReadQuery */ public abstract ReadCommand copy(); + /** + * Returns a copy of this command with isDigestQuery set to true. + */ + public abstract ReadCommand copyAsDigestQuery(); + protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup); protected abstract int oldestUnrepairedTombstone(); @@ -321,35 +345,32 @@ public abstract class ReadCommand implements ReadQuery : ReadResponse.createDataResponse(iterator, this); } - public long indexSerializedSize(int version) + long indexSerializedSize(int version) { - if (index.isPresent()) - return IndexMetadata.serializer.serializedSize(index.get(), version); - else - return 0; + return null != index + ? IndexMetadata.serializer.serializedSize(index, version) + : 0; } public Index getIndex(ColumnFamilyStore cfs) { - // if we've already consulted the index manager, and it returned a valid index - // the result should be cached here. - if(index.isPresent()) - return cfs.indexManager.getIndex(index.get()); - - // if no cached index is present, but we've already consulted the index manager - // then no registered index is suitable for this command, so just return null. - if (indexManagerQueried) + return null != index + ? cfs.indexManager.getIndex(index) + : null; + } + + static IndexMetadata findIndex(CFMetaData table, RowFilter rowFilter) + { + if (table.getIndexes().isEmpty() || rowFilter.isEmpty()) return null; - // do the lookup, set the flag to indicate so and cache the result if not null - Index selected = cfs.indexManager.getBestIndexFor(this); - indexManagerQueried = true; + ColumnFamilyStore cfs = Keyspace.openAndGetStore(table); - if (selected == null) - return null; + Index index = cfs.indexManager.getBestIndexFor(rowFilter); - index = Optional.of(selected.getIndexMetadata()); - return selected; + return null != index + ? index.getIndexMetadata() + : null; } /** @@ -602,7 +623,7 @@ public abstract class ReadCommand implements ReadQuery assert version >= MessagingService.VERSION_30; out.writeByte(command.kind.ordinal()); - out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent())); + out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(null != command.index)); if (command.isDigestQuery()) out.writeUnsignedVInt(command.digestVersion()); CFMetaData.serializer.serialize(command.metadata(), out, version); @@ -610,8 +631,8 @@ public abstract class ReadCommand implements ReadQuery ColumnFilter.serializer.serialize(command.columnFilter(), out, version); RowFilter.serializer.serialize(command.rowFilter(), out, version); DataLimits.serializer.serialize(command.limits(), out, version); - if (command.index.isPresent()) - IndexMetadata.serializer.serialize(command.index.get(), out, version); + if (null != command.index) + IndexMetadata.serializer.serialize(command.index, out, version); command.serializeSelection(out, version); } @@ -631,18 +652,16 @@ public abstract class ReadCommand implements ReadQuery ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); DataLimits limits = DataLimits.serializer.deserialize(in, version); - Optional index = hasIndex - ? deserializeIndexMetadata(in, version, metadata) - : Optional.empty(); + IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null; return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); } - private Optional deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException + private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException { try { - return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm)); + return IndexMetadata.serializer.deserialize(in, version, cfm); } catch (UnknownIndexException e) { @@ -652,7 +671,7 @@ public abstract class ReadCommand implements ReadQuery "index. Please wait for schema agreement after index creation.", cfm.ksName, cfm.cfName, e.indexId.toString()); logger.info(message); - return Optional.empty(); + return null; } } @@ -830,7 +849,7 @@ public abstract class ReadCommand implements ReadQuery else limits = DataLimits.cqlLimits(maxResults); - return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty()); + return PartitionRangeReadCommand.create(true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter)); } static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException @@ -939,9 +958,8 @@ public abstract class ReadCommand implements ReadQuery ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter; ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); - return new PartitionRangeReadCommand( - command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), - command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty()); + + return command.withUpdatedDataRange(newRange); } static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata) @@ -1096,7 +1114,7 @@ public abstract class ReadCommand implements ReadQuery // missing without any problems, so we can safely always set "inclusive" to false in the data range dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false); } - return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty()); + return PartitionRangeReadCommand.create(true, metadata, nowInSec, selection, rowFilter, limits, dataRange); } public long serializedSize(ReadCommand command, int version) @@ -1290,10 +1308,7 @@ public abstract class ReadCommand implements ReadQuery { Pair selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata); - // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift - return new SinglePartitionReadCommand( - isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE, - key, selectionAndFilter.right); + return SinglePartitionReadCommand.legacyNamesCommand(isDigest, version, metadata, nowInSeconds, selectionAndFilter.left, key, selectionAndFilter.right); } static Pair deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException @@ -1422,8 +1437,7 @@ public abstract class ReadCommand implements ReadQuery else limits = DataLimits.cqlLimits(count); - // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift - return new SinglePartitionReadCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter); + return SinglePartitionReadCommand.legacySliceCommand(isDigest, version, metadata, nowInSeconds, columnFilter, limits, key, filter); } private long serializedSliceCommandSize(SinglePartitionReadCommand command) @@ -1605,9 +1619,8 @@ public abstract class ReadCommand implements ReadQuery ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter(); ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata); - return new SinglePartitionReadCommand( - command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), - command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter); + + return command.withUpdatedClusteringIndexFilter(sliceFilter); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 686ec35..00464ca 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -70,18 +70,19 @@ public class SinglePartitionReadCommand extends ReadCommand private int oldestUnrepairedTombstone = Integer.MAX_VALUE; - public SinglePartitionReadCommand(boolean isDigest, - int digestVersion, - boolean isForThrift, - CFMetaData metadata, - int nowInSec, - ColumnFilter columnFilter, - RowFilter rowFilter, - DataLimits limits, - DecoratedKey partitionKey, - ClusteringIndexFilter clusteringIndexFilter) + private SinglePartitionReadCommand(boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter, + IndexMetadata index) { - super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); assert partitionKey.getPartitioner() == metadata.partitioner; this.partitionKey = partitionKey; this.clusteringIndexFilter = clusteringIndexFilter; @@ -90,6 +91,44 @@ public class SinglePartitionReadCommand extends ReadCommand /** * Creates a new read command on a single partition. * + * @param isForThrift whether the query is for thrift or not. + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * @param indexMetadata explicitly specified index to use for the query + * + * @return a newly created read command. + */ + public static SinglePartitionReadCommand create(boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter, + IndexMetadata indexMetadata) + { + return new SinglePartitionReadCommand(false, + 0, + isForThrift, + metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + partitionKey, + clusteringIndexFilter, + indexMetadata); + } + + /** + * Creates a new read command on a single partition. + * * @param metadata the table to query. * @param nowInSec the time in seconds to use are "now" for this query. * @param columnFilter the column filter to use for the query. @@ -112,7 +151,7 @@ public class SinglePartitionReadCommand extends ReadCommand } /** - * Creates a new read command on a single partition for thrift. + * Creates a new read command on a single partition. * * @param isForThrift whether the query is for thrift or not. * @param metadata the table to query. @@ -134,7 +173,15 @@ public class SinglePartitionReadCommand extends ReadCommand DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter) { - return new SinglePartitionReadCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + return create(isForThrift, + metadata, + nowInSec, + columnFilter, + rowFilter, + limits, + partitionKey, + clusteringIndexFilter, + findIndex(metadata, rowFilter)); } /** @@ -148,7 +195,11 @@ public class SinglePartitionReadCommand extends ReadCommand * * @return a newly created read command. The returned command will use no row filter and have no limits. */ - public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter) + public static SinglePartitionReadCommand create(CFMetaData metadata, + int nowInSec, + DecoratedKey key, + ColumnFilter columnFilter, + ClusteringIndexFilter filter) { return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter); } @@ -164,7 +215,7 @@ public class SinglePartitionReadCommand extends ReadCommand */ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key) { - return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL); + return create(metadata, nowInSec, key, Slices.ALL); } /** @@ -178,7 +229,7 @@ public class SinglePartitionReadCommand extends ReadCommand */ public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key) { - return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); + return create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); } /** @@ -211,7 +262,7 @@ public class SinglePartitionReadCommand extends ReadCommand public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices) { ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false); - return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); + return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); } /** @@ -244,7 +295,7 @@ public class SinglePartitionReadCommand extends ReadCommand public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet names) { ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false); - return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); + return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter); } /** @@ -265,7 +316,82 @@ public class SinglePartitionReadCommand extends ReadCommand public SinglePartitionReadCommand copy() { - return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + return new SinglePartitionReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); + } + + public SinglePartitionReadCommand copyAsDigestQuery() + { + return new SinglePartitionReadCommand(true, + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexMetadata()); + } + + public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter) + { + return new SinglePartitionReadCommand(isDigestQuery(), + digestVersion(), + isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits(), + partitionKey(), + filter, + indexMetadata()); + } + + static SinglePartitionReadCommand legacySliceCommand(boolean isDigest, + int digestVersion, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexSliceFilter filter) + { + // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift + return new SinglePartitionReadCommand(isDigest, + digestVersion, + true, + metadata, + nowInSec, + columnFilter, + RowFilter.NONE, + limits, + partitionKey, + filter, + null); + } + + static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest, + int digestVersion, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + DecoratedKey partitionKey, + ClusteringIndexNamesFilter filter) + { + // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift + return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null); } public DecoratedKey partitionKey() @@ -432,7 +558,7 @@ public class SinglePartitionReadCommand extends ReadCommand final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); @SuppressWarnings("resource") // we close on exception or upon closing the result of this method - UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); + UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); try { // Use a custom iterator instead of DataLimits to avoid stopping the original iterator @@ -1068,12 +1194,22 @@ public class SinglePartitionReadCommand extends ReadCommand private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional index) + public ReadCommand deserialize(DataInputPlus in, + int version, + boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + IndexMetadata index) throws IOException { DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize())); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); - return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter); + return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index dd6dde4..5976ddf 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.statements.IndexTarget; @@ -697,17 +698,17 @@ public class SecondaryIndexManager implements IndexRegistry * cached for future use when obtaining a Searcher, getting the index's underlying CFS for * ReadOrderGroup, or an estimate of the result size from an average index query. * - * @param command ReadCommand to be executed + * @param rowFilter RowFilter of the command to be executed * @return an Index instance, ready to use during execution of the command, or null if none * of the registered indexes can support the command. */ - public Index getBestIndexFor(ReadCommand command) + public Index getBestIndexFor(RowFilter rowFilter) { - if (indexes.isEmpty() || command.rowFilter().isEmpty()) + if (indexes.isEmpty() || rowFilter.isEmpty()) return null; Set searchableIndexes = new HashSet<>(); - for (RowFilter.Expression expression : command.rowFilter()) + for (RowFilter.Expression expression : rowFilter) { if (expression.isCustom()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java index 135839b..f8a7c66 100644 --- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java @@ -136,13 +136,15 @@ public class CompositesSearcher extends CassandraIndexSearcher // Query the gathered index hits. We still need to filter stale hits from the resulting query. ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false); - SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata, + SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(), + index.baseCfs.metadata, command.nowInSec(), command.columnFilter(), command.rowFilter(), DataLimits.NONE, partitionKey, - filter); + filter, + null); @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either // by the next caller of next, or through closing this iterator is this come before. UnfilteredRowIterator dataIter = http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java index 189b652..c14c5a7 100644 --- a/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java @@ -99,7 +99,8 @@ public class KeysSearcher extends CassandraIndexSearcher command.rowFilter(), DataLimits.NONE, key, - command.clusteringIndexFilter(key)); + command.clusteringIndexFilter(key), + null); @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null. // Otherwise, we close right away if empty, and if it's assigned to next it will be called either http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index cae1f1a..177fdb2 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -88,7 +88,7 @@ public abstract class AbstractReadExecutor protected void makeDigestRequests(Iterable endpoints) { - makeRequests(command.copy().setIsDigestQuery(true), endpoints); + makeRequests(command.copyAsDigestQuery(), endpoints); } private void makeRequests(ReadCommand readCommand, Iterable endpoints) @@ -284,7 +284,7 @@ public abstract class AbstractReadExecutor // Could be waiting on the data, or on enough digests. ReadCommand retryCommand = command; if (handler.resolver.isDataPresent()) - retryCommand = command.copy().setIsDigestQuery(true); + retryCommand = command.copyAsDigestQuery(); InetAddress extraReplica = Iterables.getLast(targetReplicas); if (traceState != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java index 9c216e3..ea79017 100644 --- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.service.pager; -import java.util.Optional; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +25,6 @@ import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.index.Index; -import org.apache.cassandra.schema.IndexMetadata; /** * Pages a PartitionRangeReadCommand. @@ -90,9 +86,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager } } - Index index = command.getIndex(Keyspace.openAndGetStore(command.metadata())); - Optional indexMetadata = index != null ? Optional.of(index.getIndexMetadata()) : Optional.empty(); - return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, indexMetadata); + return ((PartitionRangeReadCommand) command).withUpdatedLimitsAndDataRange(limits, pageRange); } protected void recordLast(DecoratedKey key, Row last) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 86caac3..cb74b15 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -48,7 +48,6 @@ import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.index.Index; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.metrics.ClientMetrics; @@ -1520,16 +1519,16 @@ public class CassandraServer implements Cassandra.Iface ColumnFilter columns = makeColumnFilter(metadata, column_parent, predicate); ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate); DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate); - PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, - 0, - true, - metadata, - nowInSec, - columns, - ThriftConversion.rowFilterFromThrift(metadata, range.row_filter), - limits, - new DataRange(bounds, filter), - Optional.empty()); + + PartitionRangeReadCommand cmd = + PartitionRangeReadCommand.create(true, + metadata, + nowInSec, + columns, + ThriftConversion.rowFilterFromThrift(metadata, range.row_filter), + limits, + new DataRange(bounds, filter)); + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) { assert results != null; @@ -1614,16 +1613,16 @@ public class CassandraServer implements Cassandra.Iface Clustering pageFrom = metadata.isSuper() ? new Clustering(start_column) : LegacyLayout.decodeCellName(metadata, start_column).clustering; - PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, - 0, - true, - metadata, - nowInSec, - ColumnFilter.all(metadata), - RowFilter.NONE, - limits, - new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true), - Optional.empty()); + + PartitionRangeReadCommand cmd = + PartitionRangeReadCommand.create(true, + metadata, + nowInSec, + ColumnFilter.all(metadata), + RowFilter.NONE, + limits, + new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true)); + try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) { return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount()); @@ -1706,21 +1705,17 @@ public class CassandraServer implements Cassandra.Iface ColumnFilter columns = makeColumnFilter(metadata, column_parent, column_predicate); ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate); DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate); - PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, - 0, - true, - metadata, - nowInSec, - columns, - ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions), - limits, - new DataRange(bounds, filter), - Optional.empty()); - // If there's a secondary index that the command can use, have it validate - // the request parameters. Note that as a side effect, if a viable Index is - // identified by the CFS's index manager, it will be cached in the command - // and serialized during distribution to replicas in order to avoid performing - // further lookups. + + PartitionRangeReadCommand cmd = + PartitionRangeReadCommand.create(true, + metadata, + nowInSec, + columns, + ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions), + limits, + new DataRange(bounds, filter)); + + // If there's a secondary index that the command can use, have it validate the request parameters. cmd.maybeValidateIndex(); try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel)) @@ -2533,7 +2528,7 @@ public class CassandraServer implements Cassandra.Iface // We want to know if the partition exists, so just fetch a single cell. ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false); DataLimits limits = DataLimits.thriftLimits(1, 1); - return new SinglePartitionReadCommand(false, 0, true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter); + return SinglePartitionReadCommand.create(true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter); } // Gather the clustering for the expected values and query those. http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index e8b42bc..d758efe 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -629,31 +629,7 @@ public class Util ColumnFamilyStore cfs, ReadOrderGroup orderGroup) { - return new InternalPartitionRangeReadCommand(command).queryStorageInternal(cfs, orderGroup); - } - - private static final class InternalPartitionRangeReadCommand extends PartitionRangeReadCommand - { - - private InternalPartitionRangeReadCommand(PartitionRangeReadCommand original) - { - super(original.isDigestQuery(), - original.digestVersion(), - original.isForThrift(), - original.metadata(), - original.nowInSec(), - original.columnFilter(), - original.rowFilter(), - original.limits(), - original.dataRange(), - Optional.empty()); - } - - private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs, - ReadOrderGroup orderGroup) - { - return queryStorage(cfs, orderGroup); - } + return command.queryStorage(cfs, orderGroup); } public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java index bbccc48..2457c4a 100644 --- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java @@ -118,7 +118,7 @@ public class SecondaryIndexTest .filterOn("birthdate", Operator.EQ, 1L) .build(); - Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc); + Index.Searcher searcher = rc.index().searcherFor(rc); try (ReadOrderGroup orderGroup = rc.startOrderGroup(); UnfilteredPartitionIterator pi = searcher.search(orderGroup)) { assertTrue(pi.hasNext()); @@ -204,7 +204,7 @@ public class SecondaryIndexTest // verify that it's not being indexed under any other value either ReadCommand rc = Util.cmd(cfs).build(); - assertNull(cfs.indexManager.getBestIndexFor(rc)); + assertNull(rc.index()); // resurrect w/ a newer timestamp new RowUpdateBuilder(cfs.metadata, 2, "k1").clustering("c").add("birthdate", 1L).build().apply();; @@ -222,13 +222,13 @@ public class SecondaryIndexTest // todo - checking the # of index searchers for the command is probably not the best thing to test here RowUpdateBuilder.deleteRow(cfs.metadata, 3, "k1", "c").applyUnsafe(); rc = Util.cmd(cfs).build(); - assertNull(cfs.indexManager.getBestIndexFor(rc)); + assertNull(rc.index()); // make sure obsolete mutations don't generate an index entry // todo - checking the # of index searchers for the command is probably not the best thing to test here new RowUpdateBuilder(cfs.metadata, 3, "k1").clustering("c").add("birthdate", 1L).build().apply();; rc = Util.cmd(cfs).build(); - assertNull(cfs.indexManager.getBestIndexFor(rc)); + assertNull(rc.index()); } @Test @@ -504,7 +504,7 @@ public class SecondaryIndexTest ColumnDefinition cdef = cfs.metadata.getColumnDefinition(col); ReadCommand rc = Util.cmd(cfs).filterOn(cdef.name.toString(), Operator.EQ, ((AbstractType) cdef.cellValueType()).decompose(val)).build(); - Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc); + Index.Searcher searcher = rc.index().searcherFor(rc); if (count != 0) assertNotNull(searcher); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index 7f59e2f..02b642e 100644 --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@ -116,13 +116,14 @@ public class SinglePartitionSliceCommandTest ByteBuffer zero = ByteBufferUtil.bytes(0); Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero))); ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); + ReadCommand cmd = SinglePartitionReadCommand.create(true, + cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21)); ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21); @@ -166,13 +167,14 @@ public class SinglePartitionSliceCommandTest ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); + ReadCommand cmd = SinglePartitionReadCommand.create(true, + cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); // check raw iterator for static cell try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) @@ -224,13 +226,14 @@ public class SinglePartitionSliceCommandTest ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s)); Slice slice = Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes("i1"))); ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false); - ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm, - FBUtilities.nowInSeconds(), - columnFilter, - RowFilter.NONE, - DataLimits.NONE, - key, - sliceFilter); + ReadCommand cmd = SinglePartitionReadCommand.create(true, + cfm, + FBUtilities.nowInSeconds(), + columnFilter, + RowFilter.NONE, + DataLimits.NONE, + key, + sliceFilter); String ret = cmd.toCQLString(); Assert.assertNotNull(ret); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7f297bcf/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java index 640b68b..c2598ec 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java @@ -604,7 +604,7 @@ public class SSTableReaderTest .columns("birthdate") .filterOn("birthdate", Operator.EQ, 1L) .build(); - Index.Searcher searcher = indexedCFS.indexManager.getBestIndexFor(rc).searcherFor(rc); + Index.Searcher searcher = rc.index().searcherFor(rc); assertNotNull(searcher); try (ReadOrderGroup orderGroup = ReadOrderGroup.forCommand(rc)) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org For additional commands, e-mail: commits-help@cassandra.apache.org