cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [10/10] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Date Wed, 30 Aug 2017 16:38:10 GMT
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e4d000c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e4d000c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e4d000c

Branch: refs/heads/trunk
Commit: 3e4d000c9e3ffa2df88c32d78c866e0598898dd4
Parents: 76efcc6 7ad1945
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Wed Aug 30 17:32:09 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Wed Aug 30 17:37:41 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/SelectStatement.java        |  16 +-
 .../db/AbstractReadCommandBuilder.java          |   2 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 112 +++++++++++---
 .../org/apache/cassandra/db/ReadCommand.java    | 131 +++++++++-------
 .../db/SinglePartitionReadCommand.java          | 148 ++++++++++++++-----
 .../cassandra/index/SecondaryIndexManager.java  |   8 +-
 .../internal/composites/CompositesSearcher.java |   3 +-
 .../index/internal/keys/KeysSearcher.java       |   3 +-
 .../cassandra/service/AbstractReadExecutor.java |   4 +-
 .../service/pager/PartitionRangeQueryPager.java |   8 +-
 test/unit/org/apache/cassandra/Util.java        |  25 +---
 .../apache/cassandra/db/SecondaryIndexTest.java |  10 +-
 .../db/SinglePartitionSliceCommandTest.java     |  29 ++--
 .../cassandra/index/sasi/SASIIndexTest.java     |  43 +++---
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 .../cassandra/service/ReadExecutorTest.java     |   2 +-
 17 files changed, 338 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 77eebcf,85efafb..84fef5e
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -569,18 -552,10 +569,10 @@@ public class SelectStatement implement
          if (keyBounds == null)
              return ReadQuery.EMPTY;
  
-         PartitionRangeReadCommand command = new PartitionRangeReadCommand(table,
-                                                                           nowInSec,
-                                                                           columnFilter,
-                                                                           rowFilter,
-                                                                           limit,
-                                                                           new DataRange(keyBounds, clusteringIndexFilter),
-                                                                           Optional.empty());
-         // If there's a secondary index that the command can use, have it validate
-         // the request parameters. Note that as a side effect, if a viable Index is
-         // identified by the CFS's index manager, it will be cached in the command
-         // and serialized during distribution to replicas in order to avoid performing
-         // further lookups.
+         PartitionRangeReadCommand command =
 -            PartitionRangeReadCommand.create(false, cfm, nowInSec, queriedColumns, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
++            PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+ 
+         // If there's a secondary index that the command can use, have it validate the request parameters.
          command.maybeValidateIndex();
  
          return command;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
index 8ced1c7,1c69813..481e906
--- a/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
@@@ -336,10 -336,10 +336,10 @@@ public abstract class AbstractReadComma
              else
                  bounds = new ExcludingBounds<>(start, end);
  
-             return new PartitionRangeReadCommand(cfs.metadata(), nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()), Optional.empty());
 -            return PartitionRangeReadCommand.create(false, cfs.metadata, nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
++            return PartitionRangeReadCommand.create(cfs.metadata(), nowInSeconds, makeColumnFilter(), filter, makeLimits(), new DataRange(bounds, makeFilter()));
          }
  
 -        static DecoratedKey makeKey(CFMetaData metadata, Object... partitionKey)
 +        static DecoratedKey makeKey(TableMetadata metadata, Object... partitionKey)
          {
              if (partitionKey.length == 1 && partitionKey[0] instanceof DecoratedKey)
                  return (DecoratedKey)partitionKey[0];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index da7daa7,f7b6660..e88f7fb
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@@ -20,11 -20,11 +20,11 @@@ package org.apache.cassandra.db
  import java.io.IOException;
  import java.util.ArrayList;
  import java.util.List;
- import java.util.Optional;
  
+ import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.Iterables;
  
 -import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.filter.*;
  import org.apache.cassandra.db.lifecycle.View;
@@@ -59,30 -60,39 +59,36 @@@ public class PartitionRangeReadCommand 
      private final DataRange dataRange;
      private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
  
-     public PartitionRangeReadCommand(boolean isDigest,
+     private PartitionRangeReadCommand(boolean isDigest,
 -                                      int digestVersion,
 -                                      boolean isForThrift,
 -                                      CFMetaData metadata,
 -                                      int nowInSec,
 -                                      ColumnFilter columnFilter,
 -                                      RowFilter rowFilter,
 -                                      DataLimits limits,
 -                                      DataRange dataRange,
 -                                      IndexMetadata index)
 +                                     int digestVersion,
 +                                     TableMetadata metadata,
 +                                     int nowInSec,
 +                                     ColumnFilter columnFilter,
 +                                     RowFilter rowFilter,
 +                                     DataLimits limits,
 +                                     DataRange dataRange,
-                                      Optional<IndexMetadata> index)
++                                     IndexMetadata index)
      {
-         super(Kind.PARTITION_RANGE, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits);
 -        super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
++        super(Kind.PARTITION_RANGE, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index);
          this.dataRange = dataRange;
-         this.index = index;
      }
  
-     public PartitionRangeReadCommand(TableMetadata metadata,
-                                      int nowInSec,
-                                      ColumnFilter columnFilter,
-                                      RowFilter rowFilter,
-                                      DataLimits limits,
-                                      DataRange dataRange,
-                                      Optional<IndexMetadata> index)
 -    public static PartitionRangeReadCommand create(boolean isForThrift,
 -                                                   CFMetaData metadata,
++    public static PartitionRangeReadCommand create(TableMetadata metadata,
+                                                    int nowInSec,
+                                                    ColumnFilter columnFilter,
+                                                    RowFilter rowFilter,
+                                                    DataLimits limits,
+                                                    DataRange dataRange)
      {
-         this(false, 0, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
+         return new PartitionRangeReadCommand(false,
+                                              0,
 -                                             isForThrift,
+                                              metadata,
+                                              nowInSec,
+                                              columnFilter,
+                                              rowFilter,
+                                              limits,
+                                              dataRange,
+                                              findIndex(metadata, rowFilter));
      }
  
      /**
@@@ -93,9 -103,10 +99,11 @@@
       *
       * @return a newly created read command that queries everything in the table.
       */
 -    public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
 +    public static PartitionRangeReadCommand allDataRead(TableMetadata metadata, int nowInSec)
      {
-         return new PartitionRangeReadCommand(metadata,
 -        return new PartitionRangeReadCommand(false, 0, false,
++        return new PartitionRangeReadCommand(false,
++                                             0,
+                                              metadata,
                                               nowInSec,
                                               ColumnFilter.all(metadata),
                                               RowFilter.NONE,
@@@ -142,18 -152,86 +149,67 @@@
          // DataLimits.CQLGroupByLimits.GroupByAwareCounter assumes that if GroupingState.hasClustering(), then we're in
          // the middle of a group, but we can't make that assumption if we query and range "in advance" of where we are
          // on the ring.
-         DataLimits newLimits = isRangeContinuation ? limits() : limits().withoutState();
-         return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, newRange, index);
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
 -                                             isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              isRangeContinuation ? limits() : limits().withoutState(),
+                                              dataRange().forSubRange(range),
+                                              indexMetadata());
      }
  
      public PartitionRangeReadCommand copy()
      {
-         return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
 -                                             isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              dataRange(),
+                                              indexMetadata());
+     }
+ 
+     public PartitionRangeReadCommand copyAsDigestQuery()
+     {
+         return new PartitionRangeReadCommand(true,
+                                              digestVersion(),
 -                                             isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              dataRange(),
+                                              indexMetadata());
+     }
+ 
+     public ReadCommand withUpdatedLimit(DataLimits newLimits)
+     {
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
 -                                             isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              newLimits,
+                                              dataRange(),
+                                              indexMetadata());
      }
  
-     public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
 -    public PartitionRangeReadCommand withUpdatedDataRange(DataRange newDataRange)
 -    {
 -        return new PartitionRangeReadCommand(isDigestQuery(),
 -                                             digestVersion(),
 -                                             isForThrift(),
 -                                             metadata(),
 -                                             nowInSec(),
 -                                             columnFilter(),
 -                                             rowFilter(),
 -                                             limits(),
 -                                             newDataRange,
 -                                             indexMetadata());
 -    }
 -
+     public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
      {
-         return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
 -                                             isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              newLimits,
+                                              newDataRange,
+                                              indexMetadata());
      }
  
      public long getTimeout()
@@@ -194,10 -272,11 +250,11 @@@
          metric.rangeLatency.addNano(latencyNanos);
      }
  
-     protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
+     @VisibleForTesting
+     public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
      {
          ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
 -        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
 +        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().partitionKeyType));
  
          // fetch data from current memtable, historical memtables, and SSTables in the correct order.
          final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
@@@ -356,7 -438,17 +413,16 @@@
  
      private static class Deserializer extends SelectionDeserializer
      {
-         public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+         public ReadCommand deserialize(DataInputPlus in,
+                                        int version,
+                                        boolean isDigest,
+                                        int digestVersion,
 -                                       boolean isForThrift,
 -                                       CFMetaData metadata,
++                                       TableMetadata metadata,
+                                        int nowInSec,
+                                        ColumnFilter columnFilter,
+                                        RowFilter rowFilter,
+                                        DataLimits limits,
+                                        IndexMetadata index)
          throws IOException
          {
              DataRange range = DataRange.serializer.deserialize(in, version, metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 08224bf,54389f0..e135902
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -18,9 -18,13 +18,10 @@@
  package org.apache.cassandra.db;
  
  import java.io.IOException;
 -import java.nio.ByteBuffer;
--import java.util.*;
  import java.util.function.Predicate;
  
+ import javax.annotation.Nullable;
+ 
 -import com.google.common.collect.Lists;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -32,22 -37,23 +33,22 @@@ import org.apache.cassandra.db.partitio
  import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.transform.StoppingTransformation;
  import org.apache.cassandra.db.transform.Transformation;
 -import org.apache.cassandra.dht.AbstractBounds;
++import org.apache.cassandra.exceptions.UnknownIndexException;
  import org.apache.cassandra.index.Index;
  import org.apache.cassandra.index.IndexNotAvailableException;
 -import org.apache.cassandra.io.ForwardingVersionedSerializer;
  import org.apache.cassandra.io.IVersionedSerializer;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
  import org.apache.cassandra.metrics.TableMetrics;
  import org.apache.cassandra.net.MessageOut;
 -import org.apache.cassandra.net.MessagingService;
  import org.apache.cassandra.schema.IndexMetadata;
 -import org.apache.cassandra.schema.UnknownIndexException;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
 +import org.apache.cassandra.schema.TableId;
 +import org.apache.cassandra.schema.TableMetadata;
- import org.apache.cassandra.exceptions.UnknownIndexException;
  import org.apache.cassandra.service.ClientWarn;
  import org.apache.cassandra.tracing.Tracing;
 -import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.Pair;
  
  /**
   * General interface for storage-engine read commands (common to both range and
@@@ -69,23 -112,27 +70,25 @@@ public abstract class ReadCommand exten
      private final RowFilter rowFilter;
      private final DataLimits limits;
  
-     // SecondaryIndexManager will attempt to provide the most selective of any available indexes
-     // during execution. Here we also store an the results of that lookup to repeating it over
-     // the lifetime of the command.
-     protected Optional<IndexMetadata> index = Optional.empty();
- 
-     // Flag to indicate whether the index manager has been queried to select an index for this
-     // command. This is necessary as the result of that lookup may be null, in which case we
-     // still don't want to repeat it.
-     private boolean indexManagerQueried = false;
- 
-     private boolean isDigestQuery;
+     private final boolean isDigestQuery;
      // if a digest query, the version for which the digest is expected. Ignored if not a digest.
      private int digestVersion;
 -    private final boolean isForThrift;
  
+     @Nullable
+     private final IndexMetadata index;
+ 
      protected static abstract class SelectionDeserializer
      {
-         public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) throws IOException;
+         public abstract ReadCommand deserialize(DataInputPlus in,
+                                                 int version,
+                                                 boolean isDigest,
+                                                 int digestVersion,
 -                                                boolean isForThrift,
 -                                                CFMetaData metadata,
++                                                TableMetadata metadata,
+                                                 int nowInSec,
+                                                 ColumnFilter columnFilter,
+                                                 RowFilter rowFilter,
+                                                 DataLimits limits,
+                                                 IndexMetadata index) throws IOException;
      }
  
      protected enum Kind
@@@ -249,6 -288,40 +242,30 @@@
      }
  
      /**
 -     * Whether this query is for thrift or not.
 -     *
 -     * @return whether this query is for thrift.
 -     */
 -    public boolean isForThrift()
 -    {
 -        return isForThrift;
 -    }
 -
 -    /**
+      * Index (metadata) chosen for this query. Can be null.
+      *
+      * @return index (metadata) chosen for this query
+      */
+     @Nullable
+     public IndexMetadata indexMetadata()
+     {
+         return index;
+     }
+ 
+     /**
+      *  Index instance chosen for this query. Can be null.
+      *
+      * @return Index instance chosen for this query. Can be null.
+      */
+     @Nullable
+     public Index index()
+     {
+         return null == index
+              ? null
+              : Keyspace.openAndGetStore(metadata).indexManager.getIndex(index);
+     }
+ 
+     /**
       * The clustering index filter this command to use for the provided key.
       * <p>
       * Note that that method should only be called on a key actually queried by this command
@@@ -289,25 -366,23 +310,23 @@@
  
      public Index getIndex(ColumnFamilyStore cfs)
      {
-         // if we've already consulted the index manager, and it returned a valid index
-         // the result should be cached here.
-         if(index.isPresent())
-             return cfs.indexManager.getIndex(index.get());
- 
-         // if no cached index is present, but we've already consulted the index manager
-         // then no registered index is suitable for this command, so just return null.
-         if (indexManagerQueried)
+         return null != index
+              ? cfs.indexManager.getIndex(index)
+              : null;
+     }
+ 
 -    static IndexMetadata findIndex(CFMetaData table, RowFilter rowFilter)
++    static IndexMetadata findIndex(TableMetadata table, RowFilter rowFilter)
+     {
 -        if (table.getIndexes().isEmpty() || rowFilter.isEmpty())
++        if (table.indexes.isEmpty() || rowFilter.isEmpty())
              return null;
  
-         // do the lookup, set the flag to indicate so and cache the result if not null
-         Index selected = cfs.indexManager.getBestIndexFor(this);
-         indexManagerQueried = true;
+         ColumnFamilyStore cfs = Keyspace.openAndGetStore(table);
  
-         if (selected == null)
-             return null;
+         Index index = cfs.indexManager.getBestIndexFor(rowFilter);
  
-         index = Optional.of(selected.getIndexMetadata());
-         return selected;
+         return null != index
+              ? index.getIndexMetadata()
+              : null;
      }
  
      /**
@@@ -619,11 -695,13 +638,11 @@@
  
          public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
          {
 -            assert version >= MessagingService.VERSION_30;
 -
              out.writeByte(command.kind.ordinal());
-             out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(command.index.isPresent()));
 -            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(null != command.index));
++            out.writeByte(digestFlag(command.isDigestQuery()) | indexFlag(null != command.indexMetadata()));
              if (command.isDigestQuery())
                  out.writeUnsignedVInt(command.digestVersion());
 -            CFMetaData.serializer.serialize(command.metadata(), out, version);
 +            command.metadata.id.serialize(out);
              out.writeInt(command.nowInSec());
              ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
              RowFilter.serializer.serialize(command.rowFilter(), out, version);
@@@ -653,19 -726,17 +672,17 @@@
              int nowInSec = in.readInt();
              ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
              RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
-             DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator);
-             Optional<IndexMetadata> index = hasIndex
-                                           ? deserializeIndexMetadata(in, version, metadata)
-                                           : Optional.empty();
+             DataLimits limits = DataLimits.serializer.deserialize(in, version,  metadata.comparator);
+             IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null;
  
 -            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
 +            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index);
          }
  
-         private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException
 -        private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
++        private IndexMetadata deserializeIndexMetadata(DataInputPlus in, int version, TableMetadata metadata) throws IOException
          {
              try
              {
-                 return Optional.of(IndexMetadata.serializer.deserialize(in, version, metadata));
 -                return IndexMetadata.serializer.deserialize(in, version, cfm);
++                return IndexMetadata.serializer.deserialize(in, version, metadata);
              }
              catch (UnknownIndexException e)
              {
@@@ -673,8 -744,8 +690,8 @@@
                              "If an index was just created, this is likely due to the schema not " +
                              "being fully propagated. Local read will proceed without using the " +
                              "index. Please wait for schema agreement after index creation.",
 -                            cfm.ksName, cfm.cfName, e.indexId);
 +                            metadata.keyspace, metadata.name, e.indexId);
-                 return Optional.empty();
+                 return null;
              }
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index bd65535,c7080e7..f4f36d8
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -22,6 -22,6 +22,7 @@@ import java.nio.ByteBuffer
  import java.util.*;
  import java.util.stream.Collectors;
  
++import com.google.common.annotations.VisibleForTesting;
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
  
@@@ -71,17 -73,19 +72,19 @@@ public class SinglePartitionReadComman
  
      private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
  
-     public SinglePartitionReadCommand(boolean isDigest,
-                                       int digestVersion,
-                                       TableMetadata metadata,
-                                       int nowInSec,
-                                       ColumnFilter columnFilter,
-                                       RowFilter rowFilter,
-                                       DataLimits limits,
-                                       DecoratedKey partitionKey,
-                                       ClusteringIndexFilter clusteringIndexFilter)
 -    private SinglePartitionReadCommand(boolean isDigest,
 -                                       int digestVersion,
 -                                       boolean isForThrift,
 -                                       CFMetaData metadata,
 -                                       int nowInSec,
 -                                       ColumnFilter columnFilter,
 -                                       RowFilter rowFilter,
 -                                       DataLimits limits,
 -                                       DecoratedKey partitionKey,
 -                                       ClusteringIndexFilter clusteringIndexFilter,
 -                                       IndexMetadata index)
++    @VisibleForTesting
++    protected SinglePartitionReadCommand(boolean isDigest,
++                                         int digestVersion,
++                                         TableMetadata metadata,
++                                         int nowInSec,
++                                         ColumnFilter columnFilter,
++                                         RowFilter rowFilter,
++                                         DataLimits limits,
++                                         DecoratedKey partitionKey,
++                                         ClusteringIndexFilter clusteringIndexFilter,
++                                         IndexMetadata index)
      {
-         super(Kind.SINGLE_PARTITION, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits);
 -        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
++        super(Kind.SINGLE_PARTITION, isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, index);
          assert partitionKey.getPartitioner() == metadata.partitioner;
          this.partitionKey = partitionKey;
          this.clusteringIndexFilter = clusteringIndexFilter;
@@@ -97,6 -102,43 +100,41 @@@
       * @param limits the limits to use for the query.
       * @param partitionKey the partition key for the partition to query.
       * @param clusteringIndexFilter the clustering index filter to use for the query.
+      * @param indexMetadata explicitly specified index to use for the query
+      *
+      * @return a newly created read command.
+      */
 -    public static SinglePartitionReadCommand create(boolean isForThrift,
 -                                                    CFMetaData metadata,
++    public static SinglePartitionReadCommand create(TableMetadata metadata,
+                                                     int nowInSec,
+                                                     ColumnFilter columnFilter,
+                                                     RowFilter rowFilter,
+                                                     DataLimits limits,
+                                                     DecoratedKey partitionKey,
+                                                     ClusteringIndexFilter clusteringIndexFilter,
+                                                     IndexMetadata indexMetadata)
+     {
+         return new SinglePartitionReadCommand(false,
+                                               0,
 -                                              isForThrift,
+                                               metadata,
+                                               nowInSec,
+                                               columnFilter,
+                                               rowFilter,
+                                               limits,
+                                               partitionKey,
+                                               clusteringIndexFilter,
+                                               indexMetadata);
+     }
+ 
+     /**
+      * Creates a new read command on a single partition.
+      *
+      * @param metadata the table to query.
+      * @param nowInSec the time in seconds to use are "now" for this query.
+      * @param columnFilter the column filter to use for the query.
+      * @param rowFilter the row filter to use for the query.
+      * @param limits the limits to use for the query.
+      * @param partitionKey the partition key for the partition to query.
+      * @param clusteringIndexFilter the clustering index filter to use for the query.
       *
       * @return a newly created read command.
       */
@@@ -108,7 -176,15 +146,14 @@@
                                                      DecoratedKey partitionKey,
                                                      ClusteringIndexFilter clusteringIndexFilter)
      {
-         return new SinglePartitionReadCommand(false, 0, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter);
 -        return create(isForThrift,
 -                      metadata,
++        return create(metadata,
+                       nowInSec,
+                       columnFilter,
+                       rowFilter,
+                       limits,
+                       partitionKey,
+                       clusteringIndexFilter,
+                       findIndex(metadata, rowFilter));
      }
  
      /**
@@@ -122,7 -198,11 +167,11 @@@
       *
       * @return a newly created read command. The returned command will use no row filter and have no limits.
       */
-     public static SinglePartitionReadCommand create(TableMetadata metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter)
 -    public static SinglePartitionReadCommand create(CFMetaData metadata,
++    public static SinglePartitionReadCommand create(TableMetadata metadata,
+                                                     int nowInSec,
+                                                     DecoratedKey key,
+                                                     ColumnFilter columnFilter,
+                                                     ClusteringIndexFilter filter)
      {
          return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter);
      }
@@@ -136,9 -216,9 +185,9 @@@
       *
       * @return a newly created read command that queries all the rows of {@code key}.
       */
 -    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key)
 +    public static SinglePartitionReadCommand fullPartitionRead(TableMetadata metadata, int nowInSec, DecoratedKey key)
      {
-         return SinglePartitionReadCommand.create(metadata, nowInSec, key, Slices.ALL);
+         return create(metadata, nowInSec, key, Slices.ALL);
      }
  
      /**
@@@ -150,9 -230,9 +199,9 @@@
       *
       * @return a newly created read command that queries all the rows of {@code key}.
       */
 -    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key)
 +    public static SinglePartitionReadCommand fullPartitionRead(TableMetadata metadata, int nowInSec, ByteBuffer key)
      {
-         return SinglePartitionReadCommand.create(metadata, nowInSec, metadata.partitioner.decorateKey(key), Slices.ALL);
 -        return create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL);
++        return create(metadata, nowInSec, metadata.partitioner.decorateKey(key), Slices.ALL);
      }
  
      /**
@@@ -182,10 -262,10 +231,10 @@@
       * @return a newly created read command that queries the {@code slices} in {@code key}. The returned query will
       * query every columns for the table (without limit or row filtering) and be in forward order.
       */
 -    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, Slices slices)
 +    public static SinglePartitionReadCommand create(TableMetadata metadata, int nowInSec, DecoratedKey key, Slices slices)
      {
          ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(slices, false);
-         return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+         return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
      }
  
      /**
@@@ -215,10 -295,10 +264,10 @@@
       * @return a newly created read command that queries the {@code names} in {@code key}. The returned query will
       * query every columns (without limit or row filtering) and be in forward order.
       */
 -    public static SinglePartitionReadCommand create(CFMetaData metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
 +    public static SinglePartitionReadCommand create(TableMetadata metadata, int nowInSec, DecoratedKey key, NavigableSet<Clustering> names)
      {
          ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(names, false);
-         return SinglePartitionReadCommand.create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
+         return create(metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, DataLimits.NONE, key, filter);
      }
  
      /**
@@@ -239,9 -319,99 +288,46 @@@
  
      public SinglePartitionReadCommand copy()
      {
-         return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+         return new SinglePartitionReadCommand(isDigestQuery(),
+                                               digestVersion(),
 -                                              isForThrift(),
+                                               metadata(),
+                                               nowInSec(),
+                                               columnFilter(),
+                                               rowFilter(),
+                                               limits(),
+                                               partitionKey(),
+                                               clusteringIndexFilter(),
+                                               indexMetadata());
+     }
+ 
+     public SinglePartitionReadCommand copyAsDigestQuery()
+     {
+         return new SinglePartitionReadCommand(true,
+                                               digestVersion(),
 -                                              isForThrift(),
+                                               metadata(),
+                                               nowInSec(),
+                                               columnFilter(),
+                                               rowFilter(),
+                                               limits(),
+                                               partitionKey(),
+                                               clusteringIndexFilter(),
+                                               indexMetadata());
+     }
+ 
+     public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
+     {
+         return new SinglePartitionReadCommand(isDigestQuery(),
+                                               digestVersion(),
 -                                              isForThrift(),
+                                               metadata(),
+                                               nowInSec(),
+                                               columnFilter(),
+                                               rowFilter(),
+                                               newLimits,
+                                               partitionKey(),
+                                               clusteringIndexFilter(),
+                                               indexMetadata());
      }
  
 -    public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter)
 -    {
 -        return new SinglePartitionReadCommand(isDigestQuery(),
 -                                              digestVersion(),
 -                                              isForThrift(),
 -                                              metadata(),
 -                                              nowInSec(),
 -                                              columnFilter(),
 -                                              rowFilter(),
 -                                              limits(),
 -                                              partitionKey(),
 -                                              filter,
 -                                              indexMetadata());
 -    }
 -
 -    static SinglePartitionReadCommand legacySliceCommand(boolean isDigest,
 -                                                         int digestVersion,
 -                                                         CFMetaData metadata,
 -                                                         int nowInSec,
 -                                                         ColumnFilter columnFilter,
 -                                                         DataLimits limits,
 -                                                         DecoratedKey partitionKey,
 -                                                         ClusteringIndexSliceFilter filter)
 -    {
 -        // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
 -        return new SinglePartitionReadCommand(isDigest,
 -                                              digestVersion,
 -                                              true,
 -                                              metadata,
 -                                              nowInSec,
 -                                              columnFilter,
 -                                              RowFilter.NONE,
 -                                              limits,
 -                                              partitionKey,
 -                                              filter,
 -                                              null);
 -    }
 -
 -    static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest,
 -                                                         int digestVersion,
 -                                                         CFMetaData metadata,
 -                                                         int nowInSec,
 -                                                         ColumnFilter columnFilter,
 -                                                         DecoratedKey partitionKey,
 -                                                         ClusteringIndexNamesFilter filter)
 -    {
 -        // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
 -        return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null);
 -    }
 -
      public DecoratedKey partitionKey()
      {
          return partitionKey;
@@@ -1094,12 -1260,22 +1167,21 @@@
  
      private static class Deserializer extends SelectionDeserializer
      {
-         public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
+         public ReadCommand deserialize(DataInputPlus in,
+                                        int version,
+                                        boolean isDigest,
+                                        int digestVersion,
 -                                       boolean isForThrift,
 -                                       CFMetaData metadata,
++                                       TableMetadata metadata,
+                                        int nowInSec,
+                                        ColumnFilter columnFilter,
+                                        RowFilter rowFilter,
+                                        DataLimits limits,
+                                        IndexMetadata index)
          throws IOException
          {
 -            DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in, DatabaseDescriptor.getMaxValueSize()));
 +            DecoratedKey key = metadata.partitioner.decorateKey(metadata.partitionKeyType.readValue(in, DatabaseDescriptor.getMaxValueSize()));
              ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
-             return new SinglePartitionReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter);
 -            return new SinglePartitionReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
++            return new SinglePartitionReadCommand(isDigest, digestVersion, metadata, nowInSec, columnFilter, rowFilter, limits, key, filter, index);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index eecb55e,f2100db..df5e7ce
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@@ -204,10 -202,10 +204,10 @@@ public class SecondaryIndexTes
  
          // verify that it's not being indexed under any other value either
          ReadCommand rc = Util.cmd(cfs).build();
-         assertNull(cfs.indexManager.getBestIndexFor(rc));
+         assertNull(rc.index());
  
          // resurrect w/ a newer timestamp
 -        new RowUpdateBuilder(cfs.metadata, 2, "k1").clustering("c").add("birthdate", 1L).build().apply();;
 +        new RowUpdateBuilder(cfs.metadata(), 2, "k1").clustering("c").add("birthdate", 1L).build().apply();;
          assertIndexedOne(cfs, col, 1L);
  
          // verify that row and delete w/ older timestamp does nothing
@@@ -220,15 -218,15 +220,15 @@@
  
          // delete the entire row (w/ newer timestamp this time)
          // todo - checking the # of index searchers for the command is probably not the best thing to test here
 -        RowUpdateBuilder.deleteRow(cfs.metadata, 3, "k1", "c").applyUnsafe();
 +        RowUpdateBuilder.deleteRow(cfs.metadata(), 3, "k1", "c").applyUnsafe();
          rc = Util.cmd(cfs).build();
-         assertNull(cfs.indexManager.getBestIndexFor(rc));
+         assertNull(rc.index());
  
          // make sure obsolete mutations don't generate an index entry
          // todo - checking the # of index searchers for the command is probably not the best thing to test here
 -        new RowUpdateBuilder(cfs.metadata, 3, "k1").clustering("c").add("birthdate", 1L).build().apply();;
 +        new RowUpdateBuilder(cfs.metadata(), 3, "k1").clustering("c").add("birthdate", 1L).build().apply();;
          rc = Util.cmd(cfs).build();
-         assertNull(cfs.indexManager.getBestIndexFor(rc));
+         assertNull(rc.index());
      }
  
      @Test
@@@ -533,10 -520,10 +533,10 @@@
      }
      private void assertIndexedCount(ColumnFamilyStore cfs, ByteBuffer col, Object val, int count)
      {
 -        ColumnDefinition cdef = cfs.metadata.getColumnDefinition(col);
 +        ColumnMetadata cdef = cfs.metadata().getColumn(col);
  
          ReadCommand rc = Util.cmd(cfs).filterOn(cdef.name.toString(), Operator.EQ, ((AbstractType) cdef.cellValueType()).decompose(val)).build();
-         Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc);
+         Index.Searcher searcher = rc.index().searcherFor(rc);
          if (count != 0)
              assertNotNull(searcher);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 1d4bdb6,b056da1..f79066b
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@@ -115,15 -174,16 +115,15 @@@ public class SinglePartitionSliceComman
          QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k1', 's')");
          Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM ks.tbl WHERE k='k1'").isEmpty());
  
 -        ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
 +        ColumnFilter columnFilter = ColumnFilter.selection(RegularAndStaticColumns.of(s));
          ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false);
-         ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, metadata,
-                                                          FBUtilities.nowInSeconds(),
-                                                          columnFilter,
-                                                          RowFilter.NONE,
-                                                          DataLimits.NONE,
-                                                          key,
-                                                          sliceFilter);
 -        ReadCommand cmd = SinglePartitionReadCommand.create(true,
 -                                                            cfm,
++        ReadCommand cmd = SinglePartitionReadCommand.create(metadata,
+                                                             FBUtilities.nowInSeconds(),
+                                                             columnFilter,
+                                                             RowFilter.NONE,
+                                                             DataLimits.NONE,
+                                                             key,
+                                                             sliceFilter);
  
          // check raw iterator for static cell
          try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController))
@@@ -170,19 -230,20 +170,18 @@@
      @Test
      public void toCQLStringIsSafeToCall() throws IOException
      {
 -        DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1"));
 +        DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.bytes("k1"));
  
 -        ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
 +        ColumnFilter columnFilter = ColumnFilter.selection(RegularAndStaticColumns.of(s));
          Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.inclusiveEndOf(ByteBufferUtil.bytes("i1")));
 -        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false);
 -        ReadCommand cmd = SinglePartitionReadCommand.create(true,
 -                                                            cfm,
 +        ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(metadata.comparator, slice), false);
-         ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, metadata,
-                                                          FBUtilities.nowInSeconds(),
-                                                          columnFilter,
-                                                          RowFilter.NONE,
-                                                          DataLimits.NONE,
-                                                          key,
-                                                          sliceFilter);
- 
++        ReadCommand cmd = SinglePartitionReadCommand.create(metadata,
+                                                             FBUtilities.nowInSeconds(),
+                                                             columnFilter,
+                                                             RowFilter.NONE,
+                                                             DataLimits.NONE,
+                                                             key,
+                                                             sliceFilter);
 -
          String ret = cmd.toCQLString();
          Assert.assertNotNull(ret);
          Assert.assertFalse(ret.isEmpty());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
index 986e604,03d89e1..406832a
--- a/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/sasi/SASIIndexTest.java
@@@ -1304,16 -1307,16 +1304,15 @@@ public class SASIIndexTes
          ColumnFamilyStore store = loadData(data1, true);
  
          RowFilter filter = RowFilter.create();
 -        filter.add(store.metadata.getColumnDefinition(firstName), Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a"));
 +        filter.add(store.metadata().getColumn(firstName), Operator.LIKE_CONTAINS, AsciiType.instance.fromString("a"));
  
-         ReadCommand command = new PartitionRangeReadCommand(store.metadata(),
-                                                             FBUtilities.nowInSeconds(),
-                                                             ColumnFilter.all(store.metadata()),
-                                                             filter,
-                                                             DataLimits.NONE,
-                                                             DataRange.allData(store.metadata().partitioner),
-                                                             Optional.empty());
- 
+         ReadCommand command =
 -            PartitionRangeReadCommand.create(false,
 -                                             store.metadata,
++            PartitionRangeReadCommand.create(store.metadata(),
+                                              FBUtilities.nowInSeconds(),
 -                                             ColumnFilter.all(store.metadata),
++                                             ColumnFilter.all(store.metadata()),
+                                              filter,
+                                              DataLimits.NONE,
 -                                             DataRange.allData(store.metadata.partitioner));
++                                             DataRange.allData(store.metadata().partitioner));
          try
          {
              new QueryPlan(store, command, 0).execute(ReadExecutionController.empty());
@@@ -2267,16 -2270,17 +2266,16 @@@
              put("key1", Pair.create("Pavel", 14));
          }}, false);
  
 -        ColumnIndex index = ((SASIIndex) store.indexManager.getIndexByName("first_name")).getIndex();
 +        ColumnIndex index = ((SASIIndex) store.indexManager.getIndexByName(store.name + "_first_name")).getIndex();
          IndexMemtable beforeFlushMemtable = index.getCurrentMemtable();
  
-         PartitionRangeReadCommand command = new PartitionRangeReadCommand(store.metadata(),
-                                                                           FBUtilities.nowInSeconds(),
-                                                                           ColumnFilter.all(store.metadata()),
-                                                                           RowFilter.NONE,
-                                                                           DataLimits.NONE,
-                                                                           DataRange.allData(store.getPartitioner()),
-                                                                           Optional.empty());
+         PartitionRangeReadCommand command =
 -            PartitionRangeReadCommand.create(false,
 -                                             store.metadata,
++            PartitionRangeReadCommand.create(store.metadata(),
+                                              FBUtilities.nowInSeconds(),
 -                                             ColumnFilter.all(store.metadata),
++                                             ColumnFilter.all(store.metadata()),
+                                              RowFilter.NONE,
+                                              DataLimits.NONE,
+                                              DataRange.allData(store.getPartitioner()));
  
          QueryController controller = new QueryController(store, command, Integer.MAX_VALUE);
          org.apache.cassandra.index.sasi.plan.Expression expression =
@@@ -2408,15 -2412,16 +2407,15 @@@
  
          RowFilter filter = RowFilter.create();
          for (Expression e : expressions)
 -            filter.add(store.metadata.getColumnDefinition(e.name), e.op, e.value);
 +            filter.add(store.metadata().getColumn(e.name), e.op, e.value);
  
-         ReadCommand command = new PartitionRangeReadCommand(store.metadata(),
-                                                             FBUtilities.nowInSeconds(),
-                                                             columnFilter,
-                                                             filter,
-                                                             DataLimits.cqlLimits(maxResults),
-                                                             range,
-                                                             Optional.empty());
+         ReadCommand command =
 -            PartitionRangeReadCommand.create(false,
 -                                             store.metadata,
++            PartitionRangeReadCommand.create(store.metadata(),
+                                              FBUtilities.nowInSeconds(),
+                                              columnFilter,
+                                              filter,
 -                                             DataLimits.thriftLimits(maxResults, DataLimits.NO_LIMIT),
++                                             DataLimits.cqlLimits(maxResults),
+                                              range);
  
          return command.executeLocally(command.executionController());
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e4d000c/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ReadExecutorTest.java
index fca8eca,0000000..7630cc6
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/ReadExecutorTest.java
@@@ -1,215 -1,0 +1,215 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service;
 +
 +import java.net.InetAddress;
 +import java.util.List;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.collect.ImmutableList;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.Util;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.exceptions.ReadFailureException;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.KeyspaceParams;
 +
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.fail;
 +
 +public class ReadExecutorTest
 +{
 +    static Keyspace ks;
 +    static ColumnFamilyStore cfs;
 +    static List<InetAddress> targets;
 +
 +    @BeforeClass
 +    public static void setUpClass() throws Throwable
 +    {
 +        SchemaLoader.loadSchema();
 +        SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo", "Bar"));
 +        ks = Keyspace.open("Foo");
 +        cfs = ks.getColumnFamilyStore("Bar");
 +        targets = ImmutableList.of(InetAddress.getByName("127.0.0.255"), InetAddress.getByName("127.0.0.254"), InetAddress.getByName("127.0.0.253"));
 +        cfs.sampleLatencyNanos = 0;
 +    }
 +
 +    @Before
 +    public void resetCounters() throws Throwable
 +    {
 +        cfs.metric.speculativeInsufficientReplicas.dec(cfs.metric.speculativeInsufficientReplicas.getCount());
 +        cfs.metric.speculativeRetries.dec(cfs.metric.speculativeRetries.getCount());
 +        cfs.metric.speculativeFailedRetries.dec(cfs.metric.speculativeFailedRetries.getCount());
 +    }
 +
 +    /**
 +     * If speculation would have been beneficial but could not be attempted due to lack of replicas
 +     * count that it occured
 +     */
 +    @Test
 +    public void testUnableToSpeculate() throws Throwable
 +    {
 +        assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount());
 +        assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount());
 +        AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), true);
 +        executor.maybeTryAdditionalReplicas();
 +        try
 +        {
 +            executor.get();
 +            fail();
 +        }
 +        catch (ReadTimeoutException e)
 +        {
 +            //expected
 +        }
 +        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
 +        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
 +
 +        //Shouldn't increment
 +        executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false);
 +        executor.maybeTryAdditionalReplicas();
 +        try
 +        {
 +            executor.get();
 +            fail();
 +        }
 +        catch (ReadTimeoutException e)
 +        {
 +            //expected
 +        }
 +        assertEquals(1, cfs.metric.speculativeInsufficientReplicas.getCount());
 +        assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount());
 +    }
 +
 +    /**
 +     *  Test that speculation when it is attempted is countedc, and when it succeed
 +     *  no failure is counted.
 +     */
 +    @Test
 +    public void testSpeculateSucceeded() throws Throwable
 +    {
 +        assertEquals(0, cfs.metric.speculativeRetries.getCount());
 +        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
 +        assertEquals(0, ks.metric.speculativeRetries.getCount());
 +        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
 +        AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
 +        executor.maybeTryAdditionalReplicas();
 +        new Thread()
 +        {
 +            @Override
 +            public void run()
 +            {
 +                //Failures end the read promptly but don't require mock data to be suppleid
 +                executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
 +                executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
 +                executor.handler.condition.signalAll();
 +            }
 +        }.start();
 +
 +        try
 +        {
 +            executor.get();
 +            fail();
 +        }
 +        catch (ReadFailureException e)
 +        {
 +            //expected
 +        }
 +        assertEquals(1, cfs.metric.speculativeRetries.getCount());
 +        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
 +        assertEquals(1, ks.metric.speculativeRetries.getCount());
 +        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
 +
 +    }
 +
 +    /**
 +     * Test that speculation failure statistics are incremented if speculation occurs
 +     * and the read still times out.
 +     */
 +    @Test
 +    public void testSpeculateFailed() throws Throwable
 +    {
 +        assertEquals(0, cfs.metric.speculativeRetries.getCount());
 +        assertEquals(0, cfs.metric.speculativeFailedRetries.getCount());
 +        assertEquals(0, ks.metric.speculativeRetries.getCount());
 +        assertEquals(0, ks.metric.speculativeFailedRetries.getCount());
 +        AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime());
 +        executor.maybeTryAdditionalReplicas();
 +        try
 +        {
 +            executor.get();
 +            fail();
 +        }
 +        catch (ReadTimeoutException e)
 +        {
 +            //expected
 +        }
 +        assertEquals(1, cfs.metric.speculativeRetries.getCount());
 +        assertEquals(1, cfs.metric.speculativeFailedRetries.getCount());
 +        assertEquals(1, ks.metric.speculativeRetries.getCount());
 +        assertEquals(1, ks.metric.speculativeFailedRetries.getCount());
 +    }
 +
 +    public static class MockSinglePartitionReadCommand extends SinglePartitionReadCommand
 +    {
 +        private final long timeout;
 +
 +        MockSinglePartitionReadCommand()
 +        {
 +            this(0);
 +        }
 +
 +        MockSinglePartitionReadCommand(long timeout)
 +        {
-             super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null);
++            super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null);
 +            this.timeout = timeout;
 +        }
 +
 +        @Override
 +        public long getTimeout()
 +        {
 +            return timeout;
 +        }
 +
 +        @Override
 +        public MessageOut createMessage()
 +        {
 +            return new MessageOut(MessagingService.Verb.BATCH_REMOVE)
 +            {
 +                @Override
 +                public int serializedSize(int version)
 +                {
 +                    return 0;
 +                }
 +            };
 +        }
 +
 +    }
 +
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message