cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [06/10] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Wed, 30 Aug 2017 16:38:06 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ad1945e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ad1945e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ad1945e

Branch: refs/heads/trunk
Commit: 7ad1945ee7592990027bee4fe6bbfcac72940954
Parents: 6d6081e 7f297bc
Author: Aleksey Yeschenko <aleksey@yeschenko.com>
Authored: Wed Aug 30 17:04:49 2017 +0100
Committer: Aleksey Yeschenko <aleksey@yeschenko.com>
Committed: Wed Aug 30 17:08:53 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/SelectStatement.java        |  16 +-
 .../db/AbstractReadCommandBuilder.java          |   2 +-
 .../cassandra/db/PartitionRangeReadCommand.java | 149 ++++++++++---
 .../org/apache/cassandra/db/ReadCommand.java    | 149 +++++++------
 .../db/SinglePartitionReadCommand.java          | 209 +++++++++++++++----
 .../cassandra/index/SecondaryIndexManager.java  |   9 +-
 .../internal/composites/CompositesSearcher.java |   6 +-
 .../index/internal/keys/KeysSearcher.java       |   3 +-
 .../cassandra/service/AbstractReadExecutor.java |   4 +-
 .../service/pager/PartitionRangeQueryPager.java |   8 +-
 .../cassandra/thrift/CassandraServer.java       |  68 +++---
 test/unit/org/apache/cassandra/Util.java        |  26 +--
 .../apache/cassandra/db/SecondaryIndexTest.java |  10 +-
 .../db/SinglePartitionSliceCommandTest.java     |  45 ++--
 .../cassandra/index/sasi/SASIIndexTest.java     |  46 ++--
 .../cassandra/io/sstable/SSTableReaderTest.java |   2 +-
 17 files changed, 479 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index c4aee3a,aca9e1f..d848eff
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,6 +1,12 @@@
 -3.0.15
 +3.11.1
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Fix race condition in read command serialization (CASSANDRA-13363)
 - * Enable segement creation before recovering commitlogs (CASSANDRA-13587)
   * Fix AssertionError in short read protection (CASSANDRA-13747)
   * Don't skip corrupted sstables on startup (CASSANDRA-13620)
   * Fix the merging of cells with different user type versions (CASSANDRA-13776)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/AbstractReadCommandBuilder.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index d0487a3,9e557e0..f7b6660
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@@ -121,41 -129,74 +130,108 @@@ public class PartitionRangeReadCommand 
          return dataRange.isNamesQuery();
      }
  
 -    public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
 +    /**
 +     * Returns an equivalent command but that only queries data within the provided range.
 +     *
 +     * @param range the sub-range to restrict the command to. This method <b>assumes</b> that this is a proper sub-range
 +     * of the command this is applied to.
 +     * @param isRangeContinuation whether {@code range} is a direct continuation of whatever previous range we have
 +     * queried. This matters for the {@code DataLimits} that may contain states when we do paging and in the context of
 +     * parallel queries: that state only make sense if the range queried is indeed the follow-up of whatever range we've
 +     * previously query (that yield said state). In practice this means that ranges for which {@code isRangeContinuation}
 +     * is false may have to be slightly pessimistic when counting data and may include a little bit than necessary, and
 +     * this should be dealt with post-query (in the case of {@code StorageProxy.getRangeSlice()}, which uses this method
 +     * for replica queries, this is dealt with by re-counting results on the coordinator). Note that if this is the
 +     * first range we queried, then the {@code DataLimits} will have not state and the value of this parameter doesn't
 +     * matter.
 +     */
 +    public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range, boolean isRangeContinuation)
      {
-         DataRange newRange = dataRange().forSubRange(range);
 +        // If we're not a continuation of whatever range we've previously queried, we should ignore the states of the
 +        // DataLimits as it's either useless, or misleading. This is particularly important for GROUP BY queries, where
 +        // DataLimits.CQLGroupByLimits.GroupByAwareCounter assumes that if GroupingState.hasClustering(), then we're in
 +        // the middle of a group, but we can't make that assumption if we query and range "in advance" of where we are
 +        // on the ring.
-         DataLimits newLimits = isRangeContinuation ? limits() : limits().withoutState();
-         return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, newRange, index);
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
 -                                             limits(),
++                                             isRangeContinuation ? limits() : limits().withoutState(),
+                                              dataRange().forSubRange(range),
+                                              indexMetadata());
      }
  
      public PartitionRangeReadCommand copy()
      {
-         return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              dataRange(),
+                                              indexMetadata());
+     }
+ 
+     public PartitionRangeReadCommand copyAsDigestQuery()
+     {
+         return new PartitionRangeReadCommand(true,
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              dataRange(),
+                                              indexMetadata());
+     }
+ 
++    public ReadCommand withUpdatedLimit(DataLimits newLimits)
++    {
++        return new PartitionRangeReadCommand(isDigestQuery(),
++                                             digestVersion(),
++                                             isForThrift(),
++                                             metadata(),
++                                             nowInSec(),
++                                             columnFilter(),
++                                             rowFilter(),
++                                             newLimits,
++                                             dataRange(),
++                                             indexMetadata());
++    }
++
+     public PartitionRangeReadCommand withUpdatedDataRange(DataRange newDataRange)
+     {
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              limits(),
+                                              newDataRange,
+                                              indexMetadata());
      }
  
-     public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
+     public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange)
      {
-         return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
+         return new PartitionRangeReadCommand(isDigestQuery(),
+                                              digestVersion(),
+                                              isForThrift(),
+                                              metadata(),
+                                              nowInSec(),
+                                              columnFilter(),
+                                              rowFilter(),
+                                              newLimits,
+                                              newDataRange,
+                                              indexMetadata());
      }
  
      public long getTimeout()
@@@ -196,7 -237,8 +272,8 @@@
          metric.rangeLatency.addNano(latencyNanos);
      }
  
-     protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
+     @VisibleForTesting
 -    public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
++    public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadExecutionController executionController)
      {
          ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
          Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 050546c,66985b6..54389f0
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -322,7 -329,12 +341,12 @@@ public abstract class ReadCommand exten
       */
      public abstract ReadCommand copy();
  
+     /**
+      * Returns a copy of this command with isDigestQuery set to true.
+      */
+     public abstract ReadCommand copyAsDigestQuery();
+ 
 -    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 +    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadExecutionController executionController);
  
      protected abstract int oldestUnrepairedTombstone();
  
@@@ -684,9 -630,9 +705,9 @@@
              out.writeInt(command.nowInSec());
              ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
              RowFilter.serializer.serialize(command.rowFilter(), out, version);
 -            DataLimits.serializer.serialize(command.limits(), out, version);
 +            DataLimits.serializer.serialize(command.limits(), out, version, command.metadata.comparator);
-             if (command.index.isPresent())
-                 IndexMetadata.serializer.serialize(command.index.get(), out, version);
+             if (null != command.index)
+                 IndexMetadata.serializer.serialize(command.index, out, version);
  
              command.serializeSelection(out, version);
          }
@@@ -705,10 -651,8 +726,8 @@@
              int nowInSec = in.readInt();
              ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
              RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
 -            DataLimits limits = DataLimits.serializer.deserialize(in, version);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, version,  metadata.comparator);
-             Optional<IndexMetadata> index = hasIndex
-                                             ? deserializeIndexMetadata(in, version, metadata)
-                                             : Optional.empty();
+             IndexMetadata index = hasIndex ? deserializeIndexMetadata(in, version, metadata) : null;
  
              return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
          }
@@@ -721,12 -665,13 +740,12 @@@
              }
              catch (UnknownIndexException e)
              {
 -                String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
 -                                               "If an index was just created, this is likely due to the schema not " +
 -                                               "being fully propagated. Local read will proceed without using the " +
 -                                               "index. Please wait for schema agreement after index creation.",
 -                                               cfm.ksName, cfm.cfName, e.indexId.toString());
 -                logger.info(message);
 +                logger.info("Couldn't find a defined index on {}.{} with the id {}. " +
 +                            "If an index was just created, this is likely due to the schema not " +
 +                            "being fully propagated. Local read will proceed without using the " +
 +                            "index. Please wait for schema agreement after index creation.",
 +                            cfm.ksName, cfm.cfName, e.indexId);
-                 return Optional.empty();
+                 return null;
              }
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 5d93c65,00464ca..c7080e7
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -268,7 -316,82 +319,97 @@@ public class SinglePartitionReadComman
  
      public SinglePartitionReadCommand copy()
      {
-         return new SinglePartitionReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter());
+         return new SinglePartitionReadCommand(isDigestQuery(),
+                                               digestVersion(),
+                                               isForThrift(),
+                                               metadata(),
+                                               nowInSec(),
+                                               columnFilter(),
+                                               rowFilter(),
+                                               limits(),
+                                               partitionKey(),
+                                               clusteringIndexFilter(),
+                                               indexMetadata());
+     }
+ 
+     public SinglePartitionReadCommand copyAsDigestQuery()
+     {
+         return new SinglePartitionReadCommand(true,
+                                               digestVersion(),
+                                               isForThrift(),
+                                               metadata(),
+                                               nowInSec(),
+                                               columnFilter(),
+                                               rowFilter(),
+                                               limits(),
+                                               partitionKey(),
+                                               clusteringIndexFilter(),
+                                               indexMetadata());
+     }
+ 
++    public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
++    {
++        return new SinglePartitionReadCommand(isDigestQuery(),
++                                              digestVersion(),
++                                              isForThrift(),
++                                              metadata(),
++                                              nowInSec(),
++                                              columnFilter(),
++                                              rowFilter(),
++                                              newLimits,
++                                              partitionKey(),
++                                              clusteringIndexFilter(),
++                                              indexMetadata());
++    }
++
+     public SinglePartitionReadCommand withUpdatedClusteringIndexFilter(ClusteringIndexFilter filter)
+     {
+         return new SinglePartitionReadCommand(isDigestQuery(),
+                                               digestVersion(),
+                                               isForThrift(),
+                                               metadata(),
+                                               nowInSec(),
+                                               columnFilter(),
+                                               rowFilter(),
+                                               limits(),
+                                               partitionKey(),
+                                               filter,
+                                               indexMetadata());
+     }
+ 
+     static SinglePartitionReadCommand legacySliceCommand(boolean isDigest,
+                                                          int digestVersion,
+                                                          CFMetaData metadata,
+                                                          int nowInSec,
+                                                          ColumnFilter columnFilter,
+                                                          DataLimits limits,
+                                                          DecoratedKey partitionKey,
+                                                          ClusteringIndexSliceFilter filter)
+     {
+         // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+         return new SinglePartitionReadCommand(isDigest,
+                                               digestVersion,
+                                               true,
+                                               metadata,
+                                               nowInSec,
+                                               columnFilter,
+                                               RowFilter.NONE,
+                                               limits,
+                                               partitionKey,
+                                               filter,
+                                               null);
+     }
+ 
+     static SinglePartitionReadCommand legacyNamesCommand(boolean isDigest,
+                                                          int digestVersion,
+                                                          CFMetaData metadata,
+                                                          int nowInSec,
+                                                          ColumnFilter columnFilter,
+                                                          DecoratedKey partitionKey,
+                                                          ClusteringIndexNamesFilter filter)
+     {
+         // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift
+         return new SinglePartitionReadCommand(isDigest, digestVersion, true, metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, partitionKey, filter,null);
      }
  
      public DecoratedKey partitionKey()
@@@ -334,26 -457,12 +475,12 @@@
                        lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
      }
  
-     public SinglePartitionReadCommand withUpdatedLimit(DataLimits newLimits)
-     {
-         return new SinglePartitionReadCommand(isDigestQuery(),
-                                               digestVersion(),
-                                               isForThrift(),
-                                               metadata(),
-                                               nowInSec(),
-                                               columnFilter(),
-                                               rowFilter(),
-                                               newLimits,
-                                               partitionKey,
-                                               clusteringIndexFilter);
-     }
- 
 -    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
      {
 -        return StorageProxy.read(Group.one(this), consistency, clientState);
 +        return StorageProxy.read(Group.one(this), consistency, clientState, queryStartNanoTime);
      }
  
 -    public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
 +    public SinglePartitionPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
      {
          return getPager(this, pagingState, protocolVersion);
      }
@@@ -449,7 -558,7 +576,7 @@@
                  final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
  
                  @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
-                 UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, executionController);
 -                UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
++                UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, executionController);
                  try
                  {
                      // Use a custom iterator instead of DataLimits to avoid stopping the original iterator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/index/SecondaryIndexManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index e777620,f8a7c66..2007800
--- a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@@ -109,65 -108,43 +109,67 @@@ public class CompositesSearcher extend
                          nextEntry = index.decodeEntry(indexKey, indexHits.next());
                      }
  
 -                    // Gather all index hits belonging to the same partition and query the data for those hits.
 -                    // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
 -                    // 1 read per index hit. However, this basically mean materializing all hits for a partition
 -                    // in memory so we should consider adding some paging mechanism. However, index hits should
 -                    // be relatively small so it's much better than the previous code that was materializing all
 -                    // *data* for a given partition.
 -                    BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator());
 -                    List<IndexEntry> entries = new ArrayList<>();
 +                    SinglePartitionReadCommand dataCmd;
                      DecoratedKey partitionKey = index.baseCfs.decorateKey(nextEntry.indexedKey);
 -
 -                    while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
 +                    List<IndexEntry> entries = new ArrayList<>();
 +                    if (isStaticColumn())
                      {
 -                        // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
 -                        if (isMatchingEntry(partitionKey, nextEntry, command))
 -                        {
 -                            clusterings.add(nextEntry.indexedEntryClustering);
 -                            entries.add(nextEntry);
 +                        // The index hit may not match the commad key constraint
 +                        if (!isMatchingEntry(partitionKey, nextEntry, command)) {
 +                            nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
 +                            continue;
                          }
  
 +                        // If the index is on a static column, we just need to do a full read on the partition.
 +                        // Note that we want to re-use the command.columnFilter() in case of future change.
 +                        dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
 +                                                                    command.nowInSec(),
 +                                                                    command.columnFilter(),
 +                                                                    RowFilter.NONE,
 +                                                                    DataLimits.NONE,
 +                                                                    partitionKey,
 +                                                                    new ClusteringIndexSliceFilter(Slices.ALL, false));
 +                        entries.add(nextEntry);
                          nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
                      }
 +                    else
 +                    {
 +                        // Gather all index hits belonging to the same partition and query the data for those hits.
 +                        // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing
 +                        // 1 read per index hit. However, this basically mean materializing all hits for a partition
 +                        // in memory so we should consider adding some paging mechanism. However, index hits should
 +                        // be relatively small so it's much better than the previous code that was materializing all
 +                        // *data* for a given partition.
 +                        BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.baseCfs.getComparator());
 +                        while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey))
 +                        {
 +                            // We're queried a slice of the index, but some hits may not match some of the clustering column constraints
 +                            if (isMatchingEntry(partitionKey, nextEntry, command))
 +                            {
 +                                clusterings.add(nextEntry.indexedEntryClustering);
 +                                entries.add(nextEntry);
 +                            }
 +
 +                            nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null;
 +                        }
  
 -                    // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
 -                    if (clusterings.isEmpty())
 -                        continue;
 +                        // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing
 +                        if (clusterings.isEmpty())
 +                            continue;
 +
 +                        // Query the gathered index hits. We still need to filter stale hits from the resulting query.
 +                        ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
-                         dataCmd = SinglePartitionReadCommand.create(index.baseCfs.metadata,
++                        dataCmd = SinglePartitionReadCommand.create(isForThrift(),
++                                                                    index.baseCfs.metadata,
 +                                                                    command.nowInSec(),
 +                                                                    command.columnFilter(),
 +                                                                    command.rowFilter(),
 +                                                                    DataLimits.NONE,
 +                                                                    partitionKey,
-                                                                     filter);
++                                                                    filter,
++                                                                    null);
 +                    }
  
 -                    // Query the gathered index hits. We still need to filter stale hits from the resulting query.
 -                    ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings.build(), false);
 -                    SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(),
 -                                                                                           index.baseCfs.metadata,
 -                                                                                           command.nowInSec(),
 -                                                                                           command.columnFilter(),
 -                                                                                           command.rowFilter(),
 -                                                                                           DataLimits.NONE,
 -                                                                                           partitionKey,
 -                                                                                           filter,
 -                                                                                           null);
                      @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either
                      // by the next caller of next, or through closing this iterator is this come before.
                      UnfilteredRowIterator dataIter =

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/index/internal/keys/KeysSearcher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
index 5ba13a4,ea79017..e6ad3d4
--- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java
@@@ -17,16 -17,14 +17,12 @@@
   */
  package org.apache.cassandra.service.pager;
  
- import java.util.Optional;
 -import org.slf4j.Logger;
 -import org.slf4j.LoggerFactory;
--
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.filter.DataLimits;
  import org.apache.cassandra.db.rows.Row;
  import org.apache.cassandra.dht.*;
  import org.apache.cassandra.exceptions.RequestExecutionException;
- import org.apache.cassandra.index.Index;
- import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.transport.ProtocolVersion;
  
  /**
   * Pages a PartitionRangeReadCommand.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/thrift/CassandraServer.java
index e71f512,cb74b15..f43b7a4
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@@ -1547,17 -1519,17 +1547,17 @@@ public class CassandraServer implement
                  ColumnFilter columns = makeColumnFilter(metadata, column_parent, predicate);
                  ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate);
                  DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate);
-                 PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
-                                                                               0,
-                                                                               true,
-                                                                               metadata,
-                                                                               nowInSec,
-                                                                               columns,
-                                                                               ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
-                                                                               limits,
-                                                                               new DataRange(bounds, filter),
-                                                                               Optional.empty());
+ 
+                 PartitionRangeReadCommand cmd =
+                     PartitionRangeReadCommand.create(true,
+                                                      metadata,
+                                                      nowInSec,
+                                                      columns,
+                                                      ThriftConversion.rowFilterFromThrift(metadata, range.row_filter),
+                                                      limits,
+                                                      new DataRange(bounds, filter));
+ 
 -                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
 +                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
                  {
                      assert results != null;
                      return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
@@@ -1640,19 -1611,19 +1640,19 @@@
                  ClusteringIndexFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false);
                  DataLimits limits = getLimits(range.count, true, Integer.MAX_VALUE);
                  Clustering pageFrom = metadata.isSuper()
 -                                    ? new Clustering(start_column)
 +                                    ? Clustering.make(start_column)
                                      : LegacyLayout.decodeCellName(metadata, start_column).clustering;
-                 PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
-                                                                               0,
-                                                                               true,
-                                                                               metadata,
-                                                                               nowInSec,
-                                                                               ColumnFilter.all(metadata),
-                                                                               RowFilter.NONE,
-                                                                               limits,
-                                                                               new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
-                                                                               Optional.empty());
+ 
+                 PartitionRangeReadCommand cmd =
+                     PartitionRangeReadCommand.create(true,
+                                                      metadata,
+                                                      nowInSec,
+                                                      ColumnFilter.all(metadata),
+                                                      RowFilter.NONE,
+                                                      limits,
+                                                      new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true));
+ 
 -                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
 +                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
                  {
                      return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
                  }
@@@ -1735,24 -1705,20 +1735,20 @@@
              ColumnFilter columns = makeColumnFilter(metadata, column_parent, column_predicate);
              ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate);
              DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate);
-             PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false,
-                                                                           0,
-                                                                           true,
-                                                                           metadata,
-                                                                           nowInSec,
-                                                                           columns,
-                                                                           ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
-                                                                           limits,
-                                                                           new DataRange(bounds, filter),
-                                                                           Optional.empty());
-             // If there's a secondary index that the command can use, have it validate
-             // the request parameters. Note that as a side effect, if a viable Index is
-             // identified by the CFS's index manager, it will be cached in the command
-             // and serialized during distribution to replicas in order to avoid performing
-             // further lookups.
+ 
+             PartitionRangeReadCommand cmd =
+                 PartitionRangeReadCommand.create(true,
+                                                  metadata,
+                                                  nowInSec,
+                                                  columns,
+                                                  ThriftConversion.rowFilterFromThrift(metadata, index_clause.expressions),
+                                                  limits,
+                                                  new DataRange(bounds, filter));
+ 
+             // If there's a secondary index that the command can use, have it validate the request parameters.
              cmd.maybeValidateIndex();
  
 -            try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
 +            try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
              {
                  return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/Util.java
index 3fa24d7,d758efe..a3ad653
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@@ -668,33 -627,9 +668,9 @@@ public class Uti
  
      public static UnfilteredPartitionIterator executeLocally(PartitionRangeReadCommand command,
                                                               ColumnFamilyStore cfs,
 -                                                             ReadOrderGroup orderGroup)
 +                                                             ReadExecutionController controller)
      {
-         return new InternalPartitionRangeReadCommand(command).queryStorageInternal(cfs, controller);
-     }
- 
-     private static final class InternalPartitionRangeReadCommand extends PartitionRangeReadCommand
-     {
- 
-         private InternalPartitionRangeReadCommand(PartitionRangeReadCommand original)
-         {
-             super(original.isDigestQuery(),
-                   original.digestVersion(),
-                   original.isForThrift(),
-                   original.metadata(),
-                   original.nowInSec(),
-                   original.columnFilter(),
-                   original.rowFilter(),
-                   original.limits(),
-                   original.dataRange(),
-                   Optional.empty());
-         }
- 
-         private UnfilteredPartitionIterator queryStorageInternal(ColumnFamilyStore cfs,
-                                                                  ReadExecutionController controller)
-         {
-             return queryStorage(cfs, controller);
-         }
 -        return command.queryStorage(cfs, orderGroup);
++        return command.queryStorage(cfs, controller);
      }
  
      public static Closeable markDirectoriesUnwriteable(ColumnFamilyStore cfs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
index a037d90,2457c4a..f2100db
--- a/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
+++ b/test/unit/org/apache/cassandra/db/SecondaryIndexTest.java
@@@ -115,9 -118,8 +115,9 @@@ public class SecondaryIndexTes
                                        .filterOn("birthdate", Operator.EQ, 1L)
                                        .build();
  
-         Index.Searcher searcher = cfs.indexManager.getBestIndexFor(rc).searcherFor(rc);
+         Index.Searcher searcher = rc.index().searcherFor(rc);
 -        try (ReadOrderGroup orderGroup = rc.startOrderGroup(); UnfilteredPartitionIterator pi = searcher.search(orderGroup))
 +        try (ReadExecutionController executionController = rc.executionController();
 +             UnfilteredPartitionIterator pi = searcher.search(executionController))
          {
              assertTrue(pi.hasNext());
              pi.next().close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ad1945e/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 3c09c93,02b642e..b056da1
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@@ -117,15 -114,16 +117,16 @@@ public class SinglePartitionSliceComman
  
          ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(v));
          ByteBuffer zero = ByteBufferUtil.bytes(0);
 -        Slices slices = Slices.with(cfm.comparator, Slice.make(Slice.Bound.inclusiveStartOf(zero), Slice.Bound.inclusiveEndOf(zero)));
 +        Slices slices = Slices.with(cfm.comparator, Slice.make(ClusteringBound.inclusiveStartOf(zero), ClusteringBound.inclusiveEndOf(zero)));
          ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(slices, false);
-         ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
-                                                           FBUtilities.nowInSeconds(),
-                                                           columnFilter,
-                                                           RowFilter.NONE,
-                                                           DataLimits.NONE,
-                                                           key,
-                                                           sliceFilter);
+         ReadCommand cmd = SinglePartitionReadCommand.create(true,
+                                                             cfm,
+                                                             FBUtilities.nowInSeconds(),
+                                                             columnFilter,
+                                                             RowFilter.NONE,
+                                                             DataLimits.NONE,
+                                                             key,
+                                                             sliceFilter);
  
          DataOutputBuffer out = new DataOutputBuffer((int) ReadCommand.legacyReadCommandSerializer.serializedSize(cmd, MessagingService.VERSION_21));
          ReadCommand.legacyReadCommandSerializer.serialize(cmd, out, MessagingService.VERSION_21);
@@@ -175,16 -167,17 +176,17 @@@
  
          ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
          ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.NONE, false);
-         ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
-                                                          FBUtilities.nowInSeconds(),
-                                                          columnFilter,
-                                                          RowFilter.NONE,
-                                                          DataLimits.NONE,
-                                                          key,
-                                                          sliceFilter);
+         ReadCommand cmd = SinglePartitionReadCommand.create(true,
+                                                             cfm,
+                                                             FBUtilities.nowInSeconds(),
+                                                             columnFilter,
+                                                             RowFilter.NONE,
+                                                             DataLimits.NONE,
+                                                             key,
+                                                             sliceFilter);
  
          // check raw iterator for static cell
 -        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
 +        try (ReadExecutionController executionController = cmd.executionController(); UnfilteredPartitionIterator pi = cmd.executeLocally(executionController))
          {
              checkForS(pi);
          }
@@@ -231,15 -224,16 +233,16 @@@
          DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1"));
  
          ColumnFilter columnFilter = ColumnFilter.selection(PartitionColumns.of(s));
 -        Slice slice = Slice.make(Slice.Bound.BOTTOM, Slice.Bound.inclusiveEndOf(ByteBufferUtil.bytes("i1")));
 +        Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.inclusiveEndOf(ByteBufferUtil.bytes("i1")));
          ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfm.comparator, slice), false);
-         ReadCommand cmd = new SinglePartitionReadCommand(false, MessagingService.VERSION_30, true, cfm,
-                                                          FBUtilities.nowInSeconds(),
-                                                          columnFilter,
-                                                          RowFilter.NONE,
-                                                          DataLimits.NONE,
-                                                          key,
-                                                          sliceFilter);
+         ReadCommand cmd = SinglePartitionReadCommand.create(true,
+                                                             cfm,
+                                                             FBUtilities.nowInSeconds(),
+                                                             columnFilter,
+                                                             RowFilter.NONE,
+                                                             DataLimits.NONE,
+                                                             key,
+                                                             sliceFilter);
  
          String ret = cmd.toCQLString();
          Assert.assertNotNull(ret);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message