cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggles...@apache.org
Subject [5/9] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0
Date Fri, 14 Jul 2017 17:53:42 GMT
Merge branch cassandra-2.2 into cassandra-3.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/88d2ac4f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/88d2ac4f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/88d2ac4f

Branch: refs/heads/trunk
Commit: 88d2ac4f2fadba44a9b72286ef924441014a97ba
Parents: 7de853b b08843d
Author: Benjamin Lerer <b.lerer@gmail.com>
Authored: Fri Jul 14 17:14:38 2017 +0200
Committer: Benjamin Lerer <b.lerer@gmail.com>
Committed: Fri Jul 14 17:26:34 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   7 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   3 +-
 src/java/org/apache/cassandra/db/DataRange.java |   5 +
 .../cassandra/db/PartitionRangeReadCommand.java |   7 +-
 .../org/apache/cassandra/db/ReadCommand.java    |   2 +-
 src/java/org/apache/cassandra/db/ReadQuery.java |  12 +
 .../db/SinglePartitionReadCommand.java          |  21 +-
 .../apache/cassandra/db/filter/DataLimits.java  |  63 +++--
 .../apache/cassandra/db/filter/RowFilter.java   |  15 ++
 .../apache/cassandra/service/CacheService.java  |   2 +-
 .../apache/cassandra/service/DataResolver.java  |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |   8 +-
 .../service/pager/AbstractQueryPager.java       |   2 +-
 .../service/pager/MultiPartitionPager.java      |   9 +-
 .../cassandra/service/pager/QueryPagers.java    |   2 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |   8 +-
 .../validation/operations/SelectLimitTest.java  | 256 ++++++++++++++++++-
 .../db/rows/UnfilteredRowIteratorsTest.java     |  10 +-
 18 files changed, 382 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index fffda7f,bda510f..4a823c9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,65 -1,18 +1,66 @@@
 -2.2.11
 +3.0.15
 + * Make concat work with iterators that have different subsets of columns (CASSANDRA-13482)
 + * Set test.runners based on cores and memory size (CASSANDRA-13078)
 + * Allow different NUMACTL_ARGS to be passed in (CASSANDRA-13557)
 + * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606)
 + * Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
 + * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568)
 + Merged from 2.2:
-   * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272)
-   * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592)
-   * Fix nested Tuples/UDTs validation (CASSANDRA-13646)
+  * Fix queries with LIMIT and filtering on clustering columns (CASSANDRA-11223)
+  * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272)
+  * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592)
+  * Fix nested Tuples/UDTs validation (CASSANDRA-13646)
 - * Remove unused max_value_size_in_mb config setting from yaml (CASSANDRA-13625
  
 -
 -2.2.10
 +3.0.14
 + * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172)
 + * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004)
 + * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)
 + * nodetool scrub/cleanup/upgradesstables exit code is wrong (CASSANDRA-13542)
 + * Fix the reported number of sstable data files accessed per read (CASSANDRA-13120)
 + * Fix schema digest mismatch during rolling upgrades from versions before 3.0.12 (CASSANDRA-13559)
 + * Upgrade JNA version to 4.4.0 (CASSANDRA-13072)
 + * Interned ColumnIdentifiers should use minimal ByteBuffers (CASSANDRA-13533)
 + * ReverseIndexedReader may drop rows during 2.1 to 3.0 upgrade (CASSANDRA-13525)
 + * Fix repair process violating start/end token limits for small ranges (CASSANDRA-13052)
 + * Add storage port options to sstableloader (CASSANDRA-13518)
 + * Properly handle quoted index names in cqlsh DESCRIBE output (CASSANDRA-12847)
 + * Avoid reading static row twice from old format sstables (CASSANDRA-13236)
 + * Fix NPE in StorageService.excise() (CASSANDRA-13163)
 + * Expire OutboundTcpConnection messages by a single Thread (CASSANDRA-13265)
 + * Fail repair if insufficient responses received (CASSANDRA-13397)
 + * Fix SSTableLoader fail when the loaded table contains dropped columns (CASSANDRA-13276)
 + * Avoid name clashes in CassandraIndexTest (CASSANDRA-13427)
 + * Handling partially written hint files (CASSANDRA-12728)
 + * Interrupt replaying hints on decommission (CASSANDRA-13308)
 + * Fix schema version calculation for rolling upgrades (CASSANDRA-13441)
 +Merged from 2.2:
   * Nodes started with join_ring=False should be able to serve requests when authentication is enabled (CASSANDRA-11381)
   * cqlsh COPY FROM: increment error count only for failures, not for attempts (CASSANDRA-13209)
 - * nodetool upgradesstables should upgrade system tables (CASSANDRA-13119)
 +
 +3.0.13
 + * Make reading of range tombstones more reliable (CASSANDRA-12811)
 + * Fix startup problems due to schema tables not completely flushed (CASSANDRA-12213)
 + * Fix view builder bug that can filter out data on restart (CASSANDRA-13405)
 + * Fix 2i page size calculation when there are no regular columns (CASSANDRA-13400)
 + * Fix the conversion of 2.X expired rows without regular column data (CASSANDRA-13395)
 + * Fix hint delivery when using ext+internal IPs with prefer_local enabled (CASSANDRA-13020)
 + * Fix possible NPE on upgrade to 3.0/3.X in case of IO errors (CASSANDRA-13389)
 + * Legacy deserializer can create empty range tombstones (CASSANDRA-13341)
 + * Use the Kernel32 library to retrieve the PID on Windows and fix startup checks (CASSANDRA-13333)
 + * Fix code to not exchange schema across major versions (CASSANDRA-13274)
 + * Dropping column results in "corrupt" SSTable (CASSANDRA-13337)
 + * Bugs handling range tombstones in the sstable iterators (CASSANDRA-13340)
 + * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
 + * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
 + * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)
 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305)
 + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238)
 + * Legacy caching options can prevent 3.0 upgrade (CASSANDRA-13384)
 + * Nodetool upgradesstables/scrub/compact ignores system tables (CASSANDRA-13410)
 + * Fix NPE issue in StorageService (CASSANDRA-13060)
 +Merged from 2.2:
   * Avoid starting gossiper in RemoveTest (CASSANDRA-13407)
   * Fix weightedSize() for row-cache reported by JMX and NodeTool (CASSANDRA-13393)
 - * Fix JVM metric paths (CASSANDRA-13103)
   * Honor truststore-password parameter in cassandra-stress (CASSANDRA-12773)
   * Discard in-flight shadow round responses (CASSANDRA-12653)
   * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e82f1d3,2e52eb2..f720330
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1461,37 -1712,284 +1461,38 @@@ public class ColumnFamilyStore implemen
          return data.getUncompacting();
      }
  
 -    public ColumnFamily getColumnFamily(DecoratedKey key,
 -                                        Composite start,
 -                                        Composite finish,
 -                                        boolean reversed,
 -                                        int limit,
 -                                        long timestamp)
 +    public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec)
      {
 -        return getColumnFamily(QueryFilter.getSliceFilter(key, name, start, finish, reversed, limit, timestamp));
 +        // We can use the cached value only if we know that no data it doesn't contain could be covered
 +        // by the query filter, that is if:
 +        //   1) either the whole partition is cached
 +        //   2) or we can ensure than any data the filter selects is in the cached partition
 +
 +        // We can guarantee that a partition is fully cached if the number of rows it contains is less than
 +        // what we're caching. Wen doing that, we should be careful about expiring cells: we should count
 +        // something expired that wasn't when the partition was cached, or we could decide that the whole
 +        // partition is cached when it's not. This is why we use CachedPartition#cachedLiveRows.
 +        if (cached.cachedLiveRows() < metadata.params.caching.rowsPerPartitionToCache())
 +            return true;
 +
 +        // If the whole partition isn't cached, then we must guarantee that the filter cannot select data that
 +        // is not in the cache. We can guarantee that if either the filter is a "head filter" and the cached
 +        // partition has more live rows that queried (where live rows refers to the rows that are live now),
 +        // or if we can prove that everything the filter selects is in the cached partition based on its content.
-         return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec)) || filter.isFullyCoveredBy(cached);
++        return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec, filter.selectsAllPartition()))
++                || filter.isFullyCoveredBy(cached);
      }
  
 -    /**
 -     * Fetch the row and columns given by filter.key if it is in the cache; if not, read it from disk and cache it
 -     *
 -     * If row is cached, and the filter given is within its bounds, we return from cache, otherwise from disk
 -     *
 -     * If row is not cached, we figure out what filter is "biggest", read that from disk, then
 -     * filter the result and either cache that or return it.
 -     *
 -     * @param cfId the column family to read the row from
 -     * @param filter the columns being queried.
 -     * @return the requested data for the filter provided
 -     */
 -    private ColumnFamily getThroughCache(UUID cfId, QueryFilter filter)
 +    public int gcBefore(int nowInSec)
      {
 -        assert isRowCacheEnabled()
 -               : String.format("Row cache is not enabled on table [" + name + "]");
 -
 -        RowCacheKey key = new RowCacheKey(metadata.ksAndCFName, filter.key);
 +        return nowInSec - metadata.params.gcGraceSeconds;
 +    }
  
 -        // attempt a sentinel-read-cache sequence.  if a write invalidates our sentinel, we'll return our
 -        // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862
 -        // TODO: don't evict entire rows on writes (#2864)
 -        IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
 -        if (cached != null)
 -        {
 -            if (cached instanceof RowCacheSentinel)
 -            {
 -                // Some other read is trying to cache the value, just do a normal non-caching read
 -                Tracing.trace("Row cache miss (race)");
 -                metric.rowCacheMiss.inc();
 -                return getTopLevelColumns(filter, Integer.MIN_VALUE);
 -            }
 -
 -            ColumnFamily cachedCf = (ColumnFamily)cached;
 -            if (isFilterFullyCoveredBy(filter.filter, cachedCf, filter.timestamp))
 -            {
 -                metric.rowCacheHit.inc();
 -                Tracing.trace("Row cache hit");
 -                ColumnFamily result = filterColumnFamily(cachedCf, filter);
 -                metric.updateSSTableIterated(0);
 -                return result;
 -            }
 -
 -            metric.rowCacheHitOutOfRange.inc();
 -            Tracing.trace("Ignoring row cache as cached value could not satisfy query");
 -            return getTopLevelColumns(filter, Integer.MIN_VALUE);
 -        }
 -
 -        metric.rowCacheMiss.inc();
 -        Tracing.trace("Row cache miss");
 -        RowCacheSentinel sentinel = new RowCacheSentinel();
 -        boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel);
 -        ColumnFamily data = null;
 -        ColumnFamily toCache = null;
 -        try
 -        {
 -            // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing
 -            if (metadata.getCaching().rowCache.cacheFullPartitions())
 -            {
 -                data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE);
 -                toCache = data;
 -                Tracing.trace("Populating row cache with the whole partition");
 -                if (sentinelSuccess && toCache != null)
 -                    CacheService.instance.rowCache.replace(key, sentinel, toCache);
 -                return filterColumnFamily(data, filter);
 -            }
 -
 -            // Otherwise, if we want to cache the result of the query we're about to do, we must make sure this query
 -            // covers what needs to be cached. And if the user filter does not satisfy that, we sometimes extend said
 -            // filter so we can populate the cache but only if:
 -            //   1) we can guarantee it is a strict extension, i.e. that we will still fetch the data asked by the user.
 -            //   2) the extension does not make us query more than getRowsPerPartitionToCache() (as a mean to limit the
 -            //      amount of extra work we'll do on a user query for the purpose of populating the cache).
 -            //
 -            // In practice, we can only guarantee those 2 points if the filter is one that queries the head of the
 -            // partition (and if that filter actually counts CQL3 rows since that's what we cache and it would be
 -            // bogus to compare the filter count to the 'rows to cache' otherwise).
 -            if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator))
 -            {
 -                SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter;
 -                int rowsToCache = metadata.getCaching().rowCache.rowsToCache;
 -
 -                SliceQueryFilter cacheSlice = readFilterForCache();
 -                QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp);
 -
 -                // If the filter count is less than the number of rows cached, we simply extend it to make sure we do cover the
 -                // number of rows to cache, and if that count is greater than the number of rows to cache, we simply filter what
 -                // needs to be cached afterwards.
 -                if (sliceFilter.count < rowsToCache)
 -                {
 -                    toCache = getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
 -                    if (toCache != null)
 -                    {
 -                        Tracing.trace("Populating row cache ({} rows cached)", cacheSlice.lastCounted());
 -                        data = filterColumnFamily(toCache, filter);
 -                    }
 -                }
 -                else
 -                {
 -                    data = getTopLevelColumns(filter, Integer.MIN_VALUE);
 -                    if (data != null)
 -                    {
 -                        // The filter limit was greater than the number of rows to cache. But, if the filter had a non-empty
 -                        // finish bound, we may have gotten less than what needs to be cached, in which case we shouldn't cache it
 -                        // (otherwise a cache hit would assume the whole partition is cached which is not the case).
 -                        if (sliceFilter.finish().isEmpty() || sliceFilter.lastCounted() >= rowsToCache)
 -                        {
 -                            toCache = filterColumnFamily(data, cacheFilter);
 -                            Tracing.trace("Caching {} rows (out of {} requested)", cacheSlice.lastCounted(), sliceFilter.count);
 -                        }
 -                        else
 -                        {
 -                            Tracing.trace("Not populating row cache, not enough rows fetched ({} fetched but {} required for the cache)", sliceFilter.lastCounted(), rowsToCache);
 -                        }
 -                    }
 -                }
 -
 -                if (sentinelSuccess && toCache != null)
 -                    CacheService.instance.rowCache.replace(key, sentinel, toCache);
 -                return data;
 -            }
 -            else
 -            {
 -                Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition");
 -                return getTopLevelColumns(filter, Integer.MIN_VALUE);
 -            }
 -        }
 -        finally
 -        {
 -            if (sentinelSuccess && toCache == null)
 -                invalidateCachedRow(key);
 -        }
 -    }
 -
 -    public SliceQueryFilter readFilterForCache()
 -    {
 -        // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable.
 -        return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size());
 -    }
 -
 -    public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now)
 -    {
 -        // We can use the cached value only if we know that no data it doesn't contain could be covered
 -        // by the query filter, that is if:
 -        //   1) either the whole partition is cached
 -        //   2) or we can ensure than any data the filter selects are in the cached partition
 -
 -        // When counting rows to decide if the whole row is cached, we should be careful with expiring
 -        // columns: if we use a timestamp newer than the one that was used when populating the cache, we might
 -        // end up deciding the whole partition is cached when it's really not (just some rows expired since the
 -        // cf was cached). This is the reason for Integer.MIN_VALUE below.
 -        boolean wholePartitionCached = cachedCf.liveCQL3RowCount(Integer.MIN_VALUE) < metadata.getCaching().rowCache.rowsToCache;
 -
 -        // Contrarily to the "wholePartitionCached" check above, we do want isFullyCoveredBy to take the
 -        // timestamp of the query into account when dealing with expired columns. Otherwise, we could think
 -        // the cached partition has enough live rows to satisfy the filter when it doesn't because some
 -        // are now expired.
 -        return wholePartitionCached || filter.isFullyCoveredBy(cachedCf, now);
 -    }
 -
 -    public int gcBefore(long now)
 -    {
 -        return (int) (now / 1000) - metadata.getGcGraceSeconds();
 -    }
 -
 -    /**
 -     * get a list of columns starting from a given column, in a specified order.
 -     * only the latest version of a column is returned.
 -     * @return null if there is no data and no tombstones; otherwise a ColumnFamily
 -     */
 -    public ColumnFamily getColumnFamily(QueryFilter filter)
 -    {
 -        assert name.equals(filter.getColumnFamilyName()) : filter.getColumnFamilyName();
 -
 -        ColumnFamily result = null;
 -
 -        long start = System.nanoTime();
 -        try
 -        {
 -            int gcBefore = gcBefore(filter.timestamp);
 -            if (isRowCacheEnabled())
 -            {
 -                assert !isIndex(); // CASSANDRA-5732
 -                UUID cfId = metadata.cfId;
 -
 -                ColumnFamily cached = getThroughCache(cfId, filter);
 -                if (cached == null)
 -                {
 -                    logger.trace("cached row is empty");
 -                    return null;
 -                }
 -
 -                result = cached;
 -            }
 -            else
 -            {
 -                ColumnFamily cf = getTopLevelColumns(filter, gcBefore);
 -
 -                if (cf == null)
 -                    return null;
 -
 -                result = removeDeletedCF(cf, gcBefore);
 -            }
 -
 -            removeDroppedColumns(result);
 -
 -            if (filter.filter instanceof SliceQueryFilter)
 -            {
 -                // Log the number of tombstones scanned on single key queries
 -                metric.tombstoneScannedHistogram.update(((SliceQueryFilter) filter.filter).lastTombstones());
 -                metric.liveScannedHistogram.update(((SliceQueryFilter) filter.filter).lastLive());
 -            }
 -        }
 -        finally
 -        {
 -            metric.readLatency.addNano(System.nanoTime() - start);
 -        }
 -
 -        return result;
 -    }
 -
 -    /**
 -     *  Filter a cached row, which will not be modified by the filter, but may be modified by throwing out
 -     *  tombstones that are no longer relevant.
 -     *  The returned column family won't be thread safe.
 -     */
 -    ColumnFamily filterColumnFamily(ColumnFamily cached, QueryFilter filter)
 -    {
 -        if (cached == null)
 -            return null;
 -
 -        ColumnFamily cf = cached.cloneMeShallow(ArrayBackedSortedColumns.factory, filter.filter.isReversed());
 -        int gcBefore = gcBefore(filter.timestamp);
 -        filter.collateOnDiskAtom(cf, filter.getIterator(cached), gcBefore);
 -        return removeDeletedCF(cf, gcBefore);
 -    }
 -
 -    public Set<SSTableReader> getUnrepairedSSTables()
 -    {
 -        Set<SSTableReader> unRepairedSSTables = new HashSet<>(getSSTables());
 -        Iterator<SSTableReader> sstableIterator = unRepairedSSTables.iterator();
 -        while(sstableIterator.hasNext())
 -        {
 -            SSTableReader sstable = sstableIterator.next();
 -            if (sstable.isRepaired())
 -                sstableIterator.remove();
 -        }
 -        return unRepairedSSTables;
 -    }
 -
 -    public Set<SSTableReader> getRepairedSSTables()
 -    {
 -        Set<SSTableReader> repairedSSTables = new HashSet<>(getSSTables());
 -        Iterator<SSTableReader> sstableIterator = repairedSSTables.iterator();
 -        while(sstableIterator.hasNext())
 -        {
 -            SSTableReader sstable = sstableIterator.next();
 -            if (!sstable.isRepaired())
 -                sstableIterator.remove();
 -        }
 -        return repairedSSTables;
 -    }
 -
 -    @SuppressWarnings("resource")
 -    public RefViewFragment selectAndReference(Function<View, List<SSTableReader>> filter)
 -    {
 -        long failingSince = -1L;
 -        while (true)
 +    @SuppressWarnings("resource")
 +    public RefViewFragment selectAndReference(Function<View, Iterable<SSTableReader>> filter)
 +    {
 +        long failingSince = -1L;
 +        while (true)
          {
              ViewFragment view = select(filter);
              Refs<SSTableReader> refs = Refs.tryRef(view.sstables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/DataRange.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/DataRange.java
index ffe041e,1e6f8c8..d2f9c76
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@@ -179,30 -119,16 +179,35 @@@ public class DataRang
          return keyRange.contains(pos);
      }
  
 -    public int getLiveCount(ColumnFamily data, long now)
 +    /**
 +     * Whether this {@code DataRange} queries everything (has no restriction neither on the
 +     * partition queried, nor within the queried partition).
 +     *
 +     * @return Whether this {@code DataRange} queries everything.
 +     */
 +    public boolean isUnrestricted()
 +    {
 +        return startKey().isMinimum() && stopKey().isMinimum() && clusteringIndexFilter.selectsAllPartition();
++    }
++
++    public boolean selectsAllPartition()
+     {
 -        return columnFilter instanceof SliceQueryFilter
 -             ? ((SliceQueryFilter)columnFilter).lastCounted()
 -             : columnFilter.getLiveCount(data, now);
++        return clusteringIndexFilter.selectsAllPartition();
      }
  
 -    public boolean selectsFullRowFor(ByteBuffer rowKey)
 +    /**
 +     * The clustering index filter to use for the provided key.
 +     * <p>
 +     * This may or may not be the same filter for all keys (that is, paging range
 +     * use a different filter for their start key).
 +     *
 +     * @param key the partition key for which we want the clustering index filter.
 +     *
 +     * @return the clustering filter to use for {@code key}.
 +     */
 +    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
      {
 -        return selectFullRow;
 +        return clusteringIndexFilter;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 1cf332d,0000000..617e2f5
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@@ -1,341 -1,0 +1,346 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Optional;
 +
 +import com.google.common.collect.Iterables;
 +
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.filter.*;
- import org.apache.cassandra.db.lifecycle.SSTableSet;
 +import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.BaseRowIterator;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.exceptions.RequestExecutionException;
 +import org.apache.cassandra.index.Index;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 +import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.service.pager.*;
 +import org.apache.cassandra.thrift.ThriftResultsMerger;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * A read command that selects a (part of a) range of partitions.
 + */
 +public class PartitionRangeReadCommand extends ReadCommand
 +{
 +    protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
 +
 +    private final DataRange dataRange;
 +    private int oldestUnrepairedTombstone = Integer.MAX_VALUE;
 +
 +    public PartitionRangeReadCommand(boolean isDigest,
 +                                     int digestVersion,
 +                                     boolean isForThrift,
 +                                     CFMetaData metadata,
 +                                     int nowInSec,
 +                                     ColumnFilter columnFilter,
 +                                     RowFilter rowFilter,
 +                                     DataLimits limits,
 +                                     DataRange dataRange,
 +                                     Optional<IndexMetadata> index)
 +    {
 +        super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits);
 +        this.dataRange = dataRange;
 +        this.index = index;
 +    }
 +
 +    public PartitionRangeReadCommand(CFMetaData metadata,
 +                                     int nowInSec,
 +                                     ColumnFilter columnFilter,
 +                                     RowFilter rowFilter,
 +                                     DataLimits limits,
 +                                     DataRange dataRange,
 +                                     Optional<IndexMetadata> index)
 +    {
 +        this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange, index);
 +    }
 +
 +    /**
 +     * Creates a new read command that query all the data in the table.
 +     *
 +     * @param metadata the table to query.
 +     * @param nowInSec the time in seconds to use are "now" for this query.
 +     *
 +     * @return a newly created read command that queries everything in the table.
 +     */
 +    public static PartitionRangeReadCommand allDataRead(CFMetaData metadata, int nowInSec)
 +    {
 +        return new PartitionRangeReadCommand(metadata,
 +                                             nowInSec,
 +                                             ColumnFilter.all(metadata),
 +                                             RowFilter.NONE,
 +                                             DataLimits.NONE,
 +                                             DataRange.allData(metadata.partitioner),
 +                                             Optional.empty());
 +    }
 +
 +    public DataRange dataRange()
 +    {
 +        return dataRange;
 +    }
 +
 +    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
 +    {
 +        return dataRange.clusteringIndexFilter(key);
 +    }
 +
 +    public boolean isNamesQuery()
 +    {
 +        return dataRange.isNamesQuery();
 +    }
 +
 +    public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range)
 +    {
 +        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range), index);
 +    }
 +
 +    public PartitionRangeReadCommand copy()
 +    {
 +        return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange(), index);
 +    }
 +
 +    public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
 +    {
 +        return new PartitionRangeReadCommand(metadata(), nowInSec(), columnFilter(), rowFilter(), newLimits, dataRange(), index);
 +    }
 +
 +    public long getTimeout()
 +    {
 +        return DatabaseDescriptor.getRangeRpcTimeout();
 +    }
 +
 +    public boolean selectsKey(DecoratedKey key)
 +    {
 +        if (!dataRange().contains(key))
 +            return false;
 +
 +        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().getKeyValidator());
 +    }
 +
 +    public boolean selectsClustering(DecoratedKey key, Clustering clustering)
 +    {
 +        if (clustering == Clustering.STATIC_CLUSTERING)
 +            return !columnFilter().fetchedColumns().statics.isEmpty();
 +
 +        if (!dataRange().clusteringIndexFilter(key).selects(clustering))
 +            return false;
 +        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
 +    }
 +
 +    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +    {
 +        return StorageProxy.getRangeSlice(this, consistency);
 +    }
 +
 +    public QueryPager getPager(PagingState pagingState, int protocolVersion)
 +    {
 +            return new PartitionRangeQueryPager(this, pagingState, protocolVersion);
 +    }
 +
 +    protected void recordLatency(TableMetrics metric, long latencyNanos)
 +    {
 +        metric.rangeLatency.addNano(latencyNanos);
 +    }
 +
 +    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup)
 +    {
 +        ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(dataRange().keyRange()));
 +        Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(), dataRange().keyRange().getString(metadata().getKeyValidator()));
 +
 +        // fetch data from current memtable, historical memtables, and SSTables in the correct order.
 +        final List<UnfilteredPartitionIterator> iterators = new ArrayList<>(Iterables.size(view.memtables) + view.sstables.size());
 +
 +        try
 +        {
 +            for (Memtable memtable : view.memtables)
 +            {
 +                @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
 +                Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange(), isForThrift());
 +                oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, iter.getMinLocalDeletionTime());
 +                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
 +            }
 +
 +            SSTableReadsListener readCountUpdater = newReadCountUpdater();
 +            for (SSTableReader sstable : view.sstables)
 +            {
 +                @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
 +                UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), isForThrift(), readCountUpdater);
 +                iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, metadata(), nowInSec()) : iter);
 +                if (!sstable.isRepaired())
 +                    oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
 +            }
 +            return iterators.isEmpty() ? EmptyIterators.unfilteredPartition(metadata(), isForThrift())
 +                                       : checkCacheFilter(UnfilteredPartitionIterators.mergeLazily(iterators, nowInSec()), cfs);
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            try
 +            {
 +                FBUtilities.closeAll(iterators);
 +            }
 +            catch (Exception suppressed)
 +            {
 +                e.addSuppressed(suppressed);
 +            }
 +
 +            throw e;
 +        }
 +    }
 +
 +    /**
 +     * Creates a new {@code SSTableReadsListener} to update the SSTables read counts.
 +     * @return a new {@code SSTableReadsListener} to update the SSTables read counts.
 +     */
 +    private static SSTableReadsListener newReadCountUpdater()
 +    {
 +        return new SSTableReadsListener()
 +                {
 +                    @Override
 +                    public void onScanningStarted(SSTableReader sstable)
 +                    {
 +                        sstable.incrementReadCount();
 +                    }
 +                };
 +    }
 +
 +    @Override
 +    protected int oldestUnrepairedTombstone()
 +    {
 +        return oldestUnrepairedTombstone;
 +    }
 +
 +    private UnfilteredPartitionIterator checkCacheFilter(UnfilteredPartitionIterator iter, final ColumnFamilyStore cfs)
 +    {
 +        class CacheFilter extends Transformation
 +        {
 +            @Override
 +            public BaseRowIterator applyToPartition(BaseRowIterator iter)
 +            {
 +                // Note that we rely on the fact that until we actually advance 'iter', no really costly operation is actually done
 +                // (except for reading the partition key from the index file) due to the call to mergeLazily in queryStorage.
 +                DecoratedKey dk = iter.partitionKey();
 +
 +                // Check if this partition is in the rowCache and if it is, if  it covers our filter
 +                CachedPartition cached = cfs.getRawCachedPartition(dk);
 +                ClusteringIndexFilter filter = dataRange().clusteringIndexFilter(dk);
 +
 +                if (cached != null && cfs.isFilterFullyCoveredBy(filter, limits(), cached, nowInSec()))
 +                {
 +                    // We won't use 'iter' so close it now.
 +                    iter.close();
 +
 +                    return filter.getUnfilteredRowIterator(columnFilter(), cached);
 +                }
 +
 +                return iter;
 +            }
 +        }
 +        return Transformation.apply(iter, new CacheFilter());
 +    }
 +
 +    public MessageOut<ReadCommand> createMessage(int version)
 +    {
 +        return dataRange().isPaging()
 +             ? new MessageOut<>(MessagingService.Verb.PAGED_RANGE, this, pagedRangeSerializer)
 +             : new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, rangeSliceSerializer);
 +    }
 +
 +    protected void appendCQLWhereClause(StringBuilder sb)
 +    {
 +        if (dataRange.isUnrestricted() && rowFilter().isEmpty())
 +            return;
 +
 +        sb.append(" WHERE ");
 +        // We put the row filter first because the data range can end by "ORDER BY"
 +        if (!rowFilter().isEmpty())
 +        {
 +            sb.append(rowFilter());
 +            if (!dataRange.isUnrestricted())
 +                sb.append(" AND ");
 +        }
 +        if (!dataRange.isUnrestricted())
 +            sb.append(dataRange.toCQLString(metadata()));
 +    }
 +
 +    /**
 +     * Allow to post-process the result of the query after it has been reconciled on the coordinator
 +     * but before it is passed to the CQL layer to return the ResultSet.
 +     *
 +     * See CASSANDRA-8717 for why this exists.
 +     */
 +    public PartitionIterator postReconciliationProcessing(PartitionIterator result)
 +    {
 +        ColumnFamilyStore cfs = Keyspace.open(metadata().ksName).getColumnFamilyStore(metadata().cfName);
 +        Index index = getIndex(cfs);
 +        return index == null ? result : index.postProcessorFor(this).apply(result, this);
 +    }
 +
 +    @Override
++    public boolean selectsFullPartition()
++    {
++        return dataRange.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns();
++    }
++
++    @Override
 +    public String toString()
 +    {
 +        return String.format("Read(%s.%s columns=%s rowfilter=%s limits=%s %s)",
 +                             metadata().ksName,
 +                             metadata().cfName,
 +                             columnFilter(),
 +                             rowFilter(),
 +                             limits(),
 +                             dataRange().toString(metadata()));
 +    }
 +
 +    protected void serializeSelection(DataOutputPlus out, int version) throws IOException
 +    {
 +        DataRange.serializer.serialize(dataRange(), out, version, metadata());
 +    }
 +
 +    protected long selectionSerializedSize(int version)
 +    {
 +        return DataRange.serializer.serializedSize(dataRange(), version, metadata());
 +    }
 +
 +    private static class Deserializer extends SelectionDeserializer
 +    {
 +        public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index)
 +        throws IOException
 +        {
 +            DataRange range = DataRange.serializer.deserialize(in, version, metadata);
 +            return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range, index);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 64da428,cd86336..76180cc
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -264,699 -95,55 +264,699 @@@ public abstract class ReadCommand imple
          return this;
      }
  
 -    public String getColumnFamilyName()
 +    /**
 +     * Sets the digest version, for when digest for that command is requested.
 +     * <p>
 +     * Note that we allow setting this independently of setting the command as a digest query as
 +     * this allows us to use the command as a carrier of the digest version even if we only call
 +     * setIsDigestQuery on some copy of it.
 +     *
 +     * @param digestVersion the version for the digest is this command is used for digest query..
 +     * @return this read command.
 +     */
 +    public ReadCommand setDigestVersion(int digestVersion)
      {
 -        return cfName;
 +        this.digestVersion = digestVersion;
 +        return this;
      }
  
 +    /**
 +     * Whether this query is for thrift or not.
 +     *
 +     * @return whether this query is for thrift.
 +     */
 +    public boolean isForThrift()
 +    {
 +        return isForThrift;
 +    }
 +
 +    /**
 +     * The clustering index filter this command to use for the provided key.
 +     * <p>
 +     * Note that that method should only be called on a key actually queried by this command
 +     * and in practice, this will almost always return the same filter, but for the sake of
 +     * paging, the filter on the first key of a range command might be slightly different.
 +     *
 +     * @param key a partition key queried by this command.
 +     *
 +     * @return the {@code ClusteringIndexFilter} to use for the partition of key {@code key}.
 +     */
 +    public abstract ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key);
 +
 +    /**
 +     * Returns a copy of this command.
 +     *
 +     * @return a copy of this command.
 +     */
      public abstract ReadCommand copy();
  
 -    public abstract Row getRow(Keyspace keyspace);
 +    protected abstract UnfilteredPartitionIterator queryStorage(ColumnFamilyStore cfs, ReadOrderGroup orderGroup);
 +
 +    protected abstract int oldestUnrepairedTombstone();
  
 -    public abstract IDiskAtomFilter filter();
 +    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
 +    {
 +        return isDigestQuery()
 +             ? ReadResponse.createDigestResponse(iterator, this)
 +             : ReadResponse.createDataResponse(iterator, this);
 +    }
  
 -    public String getKeyspace()
 +    public long indexSerializedSize(int version)
      {
 -        return ksName;
 +        if (index.isPresent())
 +            return IndexMetadata.serializer.serializedSize(index.get(), version);
 +        else
 +            return 0;
      }
  
 -    // maybeGenerateRetryCommand is used to generate a retry for short reads
 -    public ReadCommand maybeGenerateRetryCommand(RowDataResolver resolver, Row row)
 +    public Index getIndex(ColumnFamilyStore cfs)
      {
 -        return null;
 +        // if we've already consulted the index manager, and it returned a valid index
 +        // the result should be cached here.
 +        if(index.isPresent())
 +            return cfs.indexManager.getIndex(index.get());
 +
 +        // if no cached index is present, but we've already consulted the index manager
 +        // then no registered index is suitable for this command, so just return null.
 +        if (indexManagerQueried)
 +            return null;
 +
 +        // do the lookup, set the flag to indicate so and cache the result if not null
 +        Index selected = cfs.indexManager.getBestIndexFor(this);
 +        indexManagerQueried = true;
 +
 +        if (selected == null)
 +            return null;
 +
 +        index = Optional.of(selected.getIndexMetadata());
 +        return selected;
      }
  
 -    // maybeTrim removes columns from a response that is too long
 -    public Row maybeTrim(Row row)
 +    /**
 +     * If the index manager for the CFS determines that there's an applicable
 +     * 2i that can be used to execute this command, call its (optional)
 +     * validation method to check that nothing in this command's parameters
 +     * violates the implementation specific validation rules.
 +     */
 +    public void maybeValidateIndex()
      {
 -        return row;
 +        Index index = getIndex(Keyspace.openAndGetStore(metadata));
 +        if (null != index)
 +            index.validate(this);
      }
  
 -    public long getTimeout()
 +    /**
 +     * Executes this command on the local host.
 +     *
 +     * @param orderGroup the operation group spanning this command
 +     *
 +     * @return an iterator over the result of executing this command locally.
 +     */
 +    @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary
 +                                  // iterators created inside the try as long as we do close the original resultIterator), or by closing the result.
 +    public UnfilteredPartitionIterator executeLocally(ReadOrderGroup orderGroup)
      {
 -        return DatabaseDescriptor.getReadRpcTimeout();
 +        long startTimeNanos = System.nanoTime();
 +
 +        ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata());
 +        Index index = getIndex(cfs);
 +
 +        Index.Searcher searcher = null;
 +        if (index != null)
 +        {
 +            if (!cfs.indexManager.isIndexQueryable(index))
 +                throw new IndexNotAvailableException(index);
 +
 +            searcher = index.searcherFor(this);
 +            Tracing.trace("Executing read on {}.{} using index {}", cfs.metadata.ksName, cfs.metadata.cfName, index.getIndexMetadata().name);
 +        }
 +
 +        UnfilteredPartitionIterator resultIterator = searcher == null
 +                                         ? queryStorage(cfs, orderGroup)
 +                                         : searcher.search(orderGroup);
 +
 +        try
 +        {
 +            resultIterator = withMetricsRecording(withoutPurgeableTombstones(resultIterator, cfs), cfs.metric, startTimeNanos);
 +
 +            // If we've used a 2ndary index, we know the result already satisfy the primary expression used, so
 +            // no point in checking it again.
 +            RowFilter updatedFilter = searcher == null
 +                                    ? rowFilter()
 +                                    : index.getPostIndexQueryFilter(rowFilter());
 +
 +            // TODO: We'll currently do filtering by the rowFilter here because it's convenient. However,
 +            // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it
 +            // would be more efficient (the sooner we discard stuff we know we don't care, the less useless
 +            // processing we do on it).
-             return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec());
++            return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec(), selectsFullPartition());
 +        }
 +        catch (RuntimeException | Error e)
 +        {
 +            resultIterator.close();
 +            throw e;
 +        }
      }
 -}
  
 -class ReadCommandSerializer implements IVersionedSerializer<ReadCommand>
 -{
 -    public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +    protected abstract void recordLatency(TableMetrics metric, long latencyNanos);
 +
 +    public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +    {
 +        return UnfilteredPartitionIterators.filter(executeLocally(orderGroup), nowInSec());
 +    }
 +
 +    public ReadOrderGroup startOrderGroup()
 +    {
 +        return ReadOrderGroup.forCommand(this);
 +    }
 +
 +    /**
 +     * Wraps the provided iterator so that metrics on what is scanned by the command are recorded.
 +     * This also log warning/trow TombstoneOverwhelmingException if appropriate.
 +     */
 +    private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
 +    {
 +        class MetricRecording extends Transformation<UnfilteredRowIterator>
 +        {
 +            private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
 +            private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
 +
 +            private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 +
 +            private int liveRows = 0;
 +            private int tombstones = 0;
 +
 +            private DecoratedKey currentKey;
 +
 +            @Override
 +            public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
 +            {
 +                currentKey = iter.partitionKey();
 +                return Transformation.apply(iter, this);
 +            }
 +
 +            @Override
 +            public Row applyToStatic(Row row)
 +            {
 +                return applyToRow(row);
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                if (row.hasLiveData(ReadCommand.this.nowInSec()))
 +                    ++liveRows;
 +
 +                for (Cell cell : row.cells())
 +                {
 +                    if (!cell.isLive(ReadCommand.this.nowInSec()))
 +                        countTombstone(row.clustering());
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
 +            {
 +                countTombstone(marker.clustering());
 +                return marker;
 +            }
 +
 +            private void countTombstone(ClusteringPrefix clustering)
 +            {
 +                ++tombstones;
 +                if (tombstones > failureThreshold && respectTombstoneThresholds)
 +                {
 +                    String query = ReadCommand.this.toCQLString();
 +                    Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
 +                    throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
 +                }
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                recordLatency(metric, System.nanoTime() - startTimeNanos);
 +
 +                metric.tombstoneScannedHistogram.update(tombstones);
 +                metric.liveScannedHistogram.update(liveRows);
 +
 +                boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
 +                if (warnTombstones)
 +                {
 +                    String msg = String.format("Read %d live rows and %d tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, tombstones, ReadCommand.this.toCQLString());
 +                    ClientWarn.instance.warn(msg);
 +                    logger.warn(msg);
 +                }
 +
 +                Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : ""));
 +            }
 +        };
 +
 +        return Transformation.apply(iter, new MetricRecording());
 +    }
 +
 +    /**
 +     * Creates a message for this command.
 +     */
 +    public abstract MessageOut<ReadCommand> createMessage(int version);
 +
 +    protected abstract void appendCQLWhereClause(StringBuilder sb);
 +
 +    // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it
 +    // can save us some bandwith, and avoid making us throw a TombstoneOverwhelmingException for purgeable tombstones (which
 +    // are to some extend an artefact of compaction lagging behind and hence counting them is somewhat unintuitive).
 +    protected UnfilteredPartitionIterator withoutPurgeableTombstones(UnfilteredPartitionIterator iterator, ColumnFamilyStore cfs)
 +    {
 +        final boolean isForThrift = iterator.isForThrift();
 +        class WithoutPurgeableTombstones extends PurgeFunction
 +        {
 +            public WithoutPurgeableTombstones()
 +            {
 +                super(isForThrift, nowInSec(), cfs.gcBefore(nowInSec()), oldestUnrepairedTombstone(), cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones());
 +            }
 +
 +            protected Predicate<Long> getPurgeEvaluator()
 +            {
 +                return time -> true;
 +            }
 +        }
 +        return Transformation.apply(iterator, new WithoutPurgeableTombstones());
 +    }
 +
 +    /**
 +     * Recreate the CQL string corresponding to this query.
 +     * <p>
 +     * Note that in general the returned string will not be exactly the original user string, first
 +     * because there isn't always a single syntax for a given query,  but also because we don't have
 +     * all the information needed (we know the non-PK columns queried but not the PK ones as internally
 +     * we query them all). So this shouldn't be relied too strongly, but this should be good enough for
 +     * debugging purpose which is what this is for.
 +     */
 +    public String toCQLString()
 +    {
 +        StringBuilder sb = new StringBuilder();
 +        sb.append("SELECT ").append(columnFilter());
 +        sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName);
 +        appendCQLWhereClause(sb);
 +
 +        if (limits() != DataLimits.NONE)
 +            sb.append(' ').append(limits());
 +        return sb.toString();
 +    }
 +
 +    private static class Serializer implements IVersionedSerializer<ReadCommand>
 +    {
 +        private static int digestFlag(boolean isDigest)
 +        {
 +            return isDigest ? 0x01 : 0;
 +        }
 +
 +        private static boolean isDigest(int flags)
 +        {
 +            return (flags & 0x01) != 0;
 +        }
 +
 +        private static int thriftFlag(boolean isForThrift)
 +        {
 +            return isForThrift ? 0x02 : 0;
 +        }
 +
 +        private static boolean isForThrift(int flags)
 +        {
 +            return (flags & 0x02) != 0;
 +        }
 +
 +        private static int indexFlag(boolean hasIndex)
 +        {
 +            return hasIndex ? 0x04 : 0;
 +        }
 +
 +        private static boolean hasIndex(int flags)
 +        {
 +            return (flags & 0x04) != 0;
 +        }
 +
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            assert version >= MessagingService.VERSION_30;
 +
 +            out.writeByte(command.kind.ordinal());
 +            out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
 +            if (command.isDigestQuery())
 +                out.writeUnsignedVInt(command.digestVersion());
 +            CFMetaData.serializer.serialize(command.metadata(), out, version);
 +            out.writeInt(command.nowInSec());
 +            ColumnFilter.serializer.serialize(command.columnFilter(), out, version);
 +            RowFilter.serializer.serialize(command.rowFilter(), out, version);
 +            DataLimits.serializer.serialize(command.limits(), out, version);
 +            if (command.index.isPresent())
 +                IndexMetadata.serializer.serialize(command.index.get(), out, version);
 +
 +            command.serializeSelection(out, version);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            assert version >= MessagingService.VERSION_30;
 +
 +            Kind kind = Kind.values()[in.readByte()];
 +            int flags = in.readByte();
 +            boolean isDigest = isDigest(flags);
 +            boolean isForThrift = isForThrift(flags);
 +            boolean hasIndex = hasIndex(flags);
 +            int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
 +            CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
 +            int nowInSec = in.readInt();
 +            ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata);
 +            RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, version);
 +            Optional<IndexMetadata> index = hasIndex
 +                                            ? deserializeIndexMetadata(in, version, metadata)
 +                                            : Optional.empty();
 +
 +            return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index);
 +        }
 +
 +        private Optional<IndexMetadata> deserializeIndexMetadata(DataInputPlus in, int version, CFMetaData cfm) throws IOException
 +        {
 +            try
 +            {
 +                return Optional.of(IndexMetadata.serializer.deserialize(in, version, cfm));
 +            }
 +            catch (UnknownIndexException e)
 +            {
 +                String message = String.format("Couldn't find a defined index on %s.%s with the id %s. " +
 +                                               "If an index was just created, this is likely due to the schema not " +
 +                                               "being fully propagated. Local read will proceed without using the " +
 +                                               "index. Please wait for schema agreement after index creation.",
 +                                               cfm.ksName, cfm.cfName, e.indexId.toString());
 +                logger.info(message);
 +                return Optional.empty();
 +            }
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version >= MessagingService.VERSION_30;
 +
 +            return 2 // kind + flags
 +                 + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
 +                 + CFMetaData.serializer.serializedSize(command.metadata(), version)
 +                 + TypeSizes.sizeof(command.nowInSec())
 +                 + ColumnFilter.serializer.serializedSize(command.columnFilter(), version)
 +                 + RowFilter.serializer.serializedSize(command.rowFilter(), version)
 +                 + DataLimits.serializer.serializedSize(command.limits(), version)
 +                 + command.selectionSerializedSize(version)
 +                 + command.indexSerializedSize(version);
 +        }
 +    }
 +
 +    private enum LegacyType
 +    {
 +        GET_BY_NAMES((byte)1),
 +        GET_SLICES((byte)2);
 +
 +        public final byte serializedValue;
 +
 +        LegacyType(byte b)
 +        {
 +            this.serializedValue = b;
 +        }
 +
 +        public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
 +        {
 +            return kind == ClusteringIndexFilter.Kind.SLICE
 +                   ? GET_SLICES
 +                   : GET_BY_NAMES;
 +        }
 +
 +        public static LegacyType fromSerializedValue(byte b)
 +        {
 +            return b == 1 ? GET_BY_NAMES : GET_SLICES;
 +        }
 +    }
 +
 +    /**
 +     * Serializer for pre-3.0 RangeSliceCommands.
 +     */
 +    private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand>
      {
 -        out.writeByte(command.commandType.serializedValue);
 -        switch (command.commandType)
 +        public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            assert !rangeCommand.dataRange().isPaging();
 +
 +            // convert pre-3.0 incompatible names filters to slice filters
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            out.writeUTF(metadata.ksName);
 +            out.writeUTF(metadata.cfName);
 +            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from seconds to millis
 +
 +            // begin DiskAtomFilterSerializer.serialize()
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                out.writeByte(1);  // 0 for slices, 1 for names
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out);
 +            }
 +            else
 +            {
 +                out.writeByte(0);  // 0 for slices, 1 for names
 +
 +                // slice filter serialization
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
 +
 +                out.writeBoolean(filter.isReversed());
 +
 +                // limit
 +                DataLimits limits = rangeCommand.limits();
 +                if (limits.isDistinct())
 +                    out.writeInt(1);
 +                else
 +                    out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices()));
 +
 +                int compositesToGroup;
 +                boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
 +                    compositesToGroup = -1;
 +                else if (limits.isDistinct() && !selectsStatics)
 +                    compositesToGroup = -2;  // for DISTINCT queries (CASSANDRA-8490)
 +                else
 +                    compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size();
 +
 +                out.writeInt(compositesToGroup);
 +            }
 +
 +            serializeRowFilter(out, rangeCommand.rowFilter());
 +            AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version);
 +
 +            // maxResults
 +            out.writeInt(rangeCommand.limits().count());
 +
 +            // countCQL3Rows
 +            if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1)  // if for Thrift or DISTINCT
 +                out.writeBoolean(false);
 +            else
 +                out.writeBoolean(true);
 +
 +            // isPaging
 +            out.writeBoolean(false);
 +        }
 +
 +        public ReadCommand deserialize(DataInputPlus in, int version) throws IOException
 +        {
 +            assert version < MessagingService.VERSION_30;
 +
 +            String keyspace = in.readUTF();
 +            String columnFamily = in.readUTF();
 +
 +            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily);
 +            if (metadata == null)
 +            {
 +                String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily);
 +                throw new UnknownColumnFamilyException(message, null);
 +            }
 +
 +            int nowInSec = (int) (in.readLong() / 1000);  // convert from millis to seconds
 +
 +            ClusteringIndexFilter filter;
 +            ColumnFilter selection;
 +            int compositesToGroup = 0;
 +            int perPartitionLimit = -1;
 +            byte readType = in.readByte();  // 0 for slices, 1 for names
 +            if (readType == 1)
 +            {
 +                Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
 +                selection = selectionAndFilter.left;
 +                filter = selectionAndFilter.right;
 +            }
 +            else
 +            {
 +                Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
 +                filter = p.left;
 +                perPartitionLimit = in.readInt();
 +                compositesToGroup = in.readInt();
 +                selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata);
 +            }
 +
 +            RowFilter rowFilter = deserializeRowFilter(in, metadata);
 +
 +            AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version);
 +            int maxResults = in.readInt();
 +
 +            boolean countCQL3Rows = in.readBoolean();  // countCQL3Rows (not needed)
 +            in.readBoolean();  // isPaging (not needed)
 +
 +            boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING));
 +            // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former,
 +            // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less
 +            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use
 +            // that fact.
 +            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows);
 +            DataLimits limits;
 +            if (isDistinct)
 +                limits = DataLimits.distinctLimits(maxResults);
 +            else if (compositesToGroup == -1)
 +                limits = DataLimits.thriftLimits(maxResults, perPartitionLimit);
 +            else
 +                limits = DataLimits.cqlLimits(maxResults);
 +
 +            return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty());
 +        }
 +
 +        static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException
 +        {
 +            ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator());
 +            out.writeInt(indexExpressions.size());
 +            for (RowFilter.Expression expression : indexExpressions)
 +            {
 +                ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
 +                expression.operator().writeTo(out);
 +                ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
 +            }
 +        }
 +
 +        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException
 +        {
 +            int numRowFilters = in.readInt();
 +            if (numRowFilters == 0)
 +                return RowFilter.NONE;
 +
 +            RowFilter rowFilter = RowFilter.create(numRowFilters);
 +            for (int i = 0; i < numRowFilters; i++)
 +            {
 +                ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
 +                ColumnDefinition column = metadata.getColumnDefinition(columnName);
 +                Operator op = Operator.readFrom(in);
 +                ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
 +                rowFilter.add(column, op, indexValue);
 +            }
 +            return rowFilter;
 +        }
 +
 +        static long serializedRowFilterSize(RowFilter rowFilter)
 +        {
 +            long size = TypeSizes.sizeof(0);  // rowFilterCount
 +            for (RowFilter.Expression expression : rowFilter)
 +            {
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                size += TypeSizes.sizeof(0);  // operator int value
 +                size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +            }
 +            return size;
 +        }
 +
 +        public long serializedSize(ReadCommand command, int version)
 +        {
 +            assert version < MessagingService.VERSION_30;
 +            assert command.kind == Kind.PARTITION_RANGE;
 +
 +            PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command;
 +            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
 +            CFMetaData metadata = rangeCommand.metadata();
 +
 +            long size = TypeSizes.sizeof(metadata.ksName);
 +            size += TypeSizes.sizeof(metadata.cfName);
 +            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
 +
 +            size += 1;  // single byte flag: 0 for slices, 1 for names
 +            if (rangeCommand.isNamesQuery())
 +            {
 +                PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns();
 +                ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns);
 +            }
 +            else
 +            {
 +                ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
 +                boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
 +                size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata);
 +                size += TypeSizes.sizeof(filter.isReversed());
 +                size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
 +                size += TypeSizes.sizeof(0); // compositesToGroup
 +            }
 +
 +            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
 +            {
 +                size += TypeSizes.sizeof(0);
 +            }
 +            else
 +            {
 +                ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator());
 +                size += TypeSizes.sizeof(indexExpressions.size());
 +                for (RowFilter.Expression expression : indexExpressions)
 +                {
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
 +                    size += TypeSizes.sizeof(expression.operator().ordinal());
 +                    size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
 +                }
 +            }
 +
 +            size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version);
 +            size += TypeSizes.sizeof(rangeCommand.limits().count());
 +            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
 +            return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
 +        }
 +
 +        static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command)
          {
 -            case GET_BY_NAMES:
 -                SliceByNamesReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            case GET_SLICES:
 -                SliceFromReadCommand.serializer.serialize(command, out, version);
 -                break;
 -            default:
 -                throw new AssertionError();
 +            if (!command.dataRange().isNamesQuery())
 +                return command;
 +
 +            CFMetaData metadata = command.metadata();
 +            if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns()))
 +                return command;
 +
 +            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter;
 +            ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
 +            DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter);
 +            return new PartitionRangeReadCommand(
 +                    command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(),
 +                    command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty());
 +        }
 +
 +        static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata)
 +        {
 +            // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys.
 +            // In that case, we'll basically be querying the first row of the partition, but we must make sure we include
 +            // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise.
 +            if (compositesToGroup == -2)
 +                return ColumnFilter.all(metadata);
 +
 +            // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all
 +            PartitionColumns columns = selectsStatics
 +                                     ? metadata.partitionColumns()
 +                                     : metadata.partitionColumns().withoutStatics();
 +            return ColumnFilter.selectionBuilder().addAll(columns).build();
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadQuery.java
index 178ca7c,0000000..75ba8f5
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@@ -1,141 -1,0 +1,153 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.exceptions.RequestExecutionException;
 +import org.apache.cassandra.service.ClientState;
 +import org.apache.cassandra.service.pager.QueryPager;
 +import org.apache.cassandra.service.pager.PagingState;
 +
 +/**
 + * Generic abstraction for read queries.
 + * <p>
 + * The main implementation of this is {@link ReadCommand} but we have this interface because
 + * {@link SinglePartitionReadCommand.Group} is also consider as a "read query" but is not a
 + * {@code ReadCommand}.
 + */
 +public interface ReadQuery
 +{
 +    ReadQuery EMPTY = new ReadQuery()
 +    {
 +        public ReadOrderGroup startOrderGroup()
 +        {
 +            return ReadOrderGroup.emptyGroup();
 +        }
 +
 +        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
 +        {
 +            return EmptyIterators.partition();
 +        }
 +
 +        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +        {
 +            return EmptyIterators.partition();
 +        }
 +
 +        public DataLimits limits()
 +        {
 +            // What we return here doesn't matter much in practice. However, returning DataLimits.NONE means
 +            // "no particular limit", which makes SelectStatement.execute() take the slightly more complex "paging"
 +            // path. Not a big deal but it's easy enough to return a limit of 0 rows which avoids this.
 +            return DataLimits.cqlLimits(0);
 +        }
 +
 +        public QueryPager getPager(PagingState state, int protocolVersion)
 +        {
 +            return QueryPager.EMPTY;
 +        }
 +
 +        public QueryPager getLocalPager()
 +        {
 +            return QueryPager.EMPTY;
 +        }
 +
 +        public boolean selectsKey(DecoratedKey key)
 +        {
 +            return false;
 +        }
 +
 +        public boolean selectsClustering(DecoratedKey key, Clustering clustering)
 +        {
 +            return false;
 +        }
++
++        @Override
++        public boolean selectsFullPartition()
++        {
++            return false;
++        }
 +    };
 +
 +    /**
 +     * Starts a new read operation.
 +     * <p>
 +     * This must be called before {@link executeInternal} and passed to it to protect the read.
 +     * The returned object <b>must</b> be closed on all path and it is thus strongly advised to
 +     * use it in a try-with-ressource construction.
 +     *
 +     * @return a newly started order group for this {@code ReadQuery}.
 +     */
 +    public ReadOrderGroup startOrderGroup();
 +
 +    /**
 +     * Executes the query at the provided consistency level.
 +     *
 +     * @param consistency the consistency level to achieve for the query.
 +     * @param clientState the {@code ClientState} for the query. In practice, this can be null unless
 +     * {@code consistency} is a serial consistency.
 +     *
 +     * @return the result of the query.
 +     */
 +    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException;
 +
 +    /**
 +     * Execute the query for internal queries (that is, it basically executes the query locally).
 +     *
 +     * @param orderGroup the {@code ReadOrderGroup} protecting the read.
 +     * @return the result of the query.
 +     */
 +    public PartitionIterator executeInternal(ReadOrderGroup orderGroup);
 +
 +    /**
 +     * Returns a pager for the query.
 +     *
 +     * @param pagingState the {@code PagingState} to start from if this is a paging continuation. This can be
 +     * {@code null} if this is the start of paging.
 +     * @param protocolVersion the protocol version to use for the paging state of that pager.
 +     *
 +     * @return a pager for the query.
 +     */
 +    public QueryPager getPager(PagingState pagingState, int protocolVersion);
 +
 +    /**
 +     * The limits for the query.
 +     *
 +     * @return The limits for the query.
 +     */
 +    public DataLimits limits();
 +
 +    /**
 +     * @return true if the read query would select the given key, including checks against the row filter, if
 +     * checkRowFilter is true
 +     */
 +    public boolean selectsKey(DecoratedKey key);
 +
 +    /**
 +     * @return true if the read query would select the given clustering, including checks against the row filter, if
 +     * checkRowFilter is true
 +     */
 +    public boolean selectsClustering(DecoratedKey key, Clustering clustering);
++
++    /**
++     * Checks if this {@code ReadQuery} selects full partitions, that is it has no filtering on clustering or regular columns.
++     * @return {@code true} if this {@code ReadQuery} selects full partitions, {@code false} otherwise.
++     */
++    public boolean selectsFullPartition();
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message