cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pa...@apache.org
Subject [5/8] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Mon, 25 Sep 2017 06:30:20 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e3d56ec
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e3d56ec
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e3d56ec

Branch: refs/heads/trunk
Commit: 3e3d56ecd41f04a31a60a4811702873d9aa57aad
Parents: 0e5c84a 68bdf45
Author: Paulo Motta <paulo@apache.org>
Authored: Mon Sep 25 01:01:31 2017 -0500
Committer: Paulo Motta <paulo@apache.org>
Committed: Mon Sep 25 01:01:47 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  |   5 +-
 .../org/apache/cassandra/db/ReadCommand.java    |   3 +-
 .../db/SinglePartitionReadCommand.java          |   9 +-
 .../apache/cassandra/db/filter/DataLimits.java  | 111 ++++++++++++-------
 .../db/partitions/CachedBTreePartition.java     |   3 +-
 .../apache/cassandra/db/rows/AbstractRow.java   |   6 +-
 src/java/org/apache/cassandra/db/rows/Row.java  |   7 +-
 .../cassandra/db/view/ViewUpdateGenerator.java  |   6 +-
 .../composites/ClusteringColumnIndex.java       |   5 +-
 .../internal/composites/PartitionKeyIndex.java  |   4 +-
 .../apache/cassandra/service/DataResolver.java  |   6 +-
 .../apache/cassandra/service/StorageProxy.java  |  12 +-
 .../service/pager/AbstractQueryPager.java       |   4 +-
 .../cassandra/service/pager/QueryPagers.java    |   2 +-
 .../org/apache/cassandra/cql3/CachingBench.java |   3 +-
 .../cassandra/cql3/GcCompactionBench.java       |   3 +-
 .../apache/cassandra/cql3/GcCompactionTest.java |   3 +-
 .../apache/cassandra/cql3/ViewComplexTest.java  |  65 ++++++++++-
 .../org/apache/cassandra/cql3/ViewTest.java     |   2 +
 .../apache/cassandra/db/RangeTombstoneTest.java |  35 ++++--
 .../db/compaction/CompactionsPurgeTest.java     |   3 +-
 22 files changed, 226 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index eb2ccc0,9cba02b..8fc72fc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,15 -1,6 +1,16 @@@
 -3.0.15
 +3.11.1
+  * Handle limit correctly on tables with strict liveness (CASSANDRA-13883)
 - * Fix missing original update in TriggerExecutor (CASSANDRA-13894)
 + * AbstractTokenTreeBuilder#serializedSize returns wrong value when there is a single leaf and overflow collisions (CASSANDRA-13869)
 + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418)
 + * BTree.Builder memory leak (CASSANDRA-13754)
 + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798)
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
   * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043)
   * Improve short read protection performance (CASSANDRA-13794)
   * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index bb4d5e8,160b104..4f3c66f
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -462,7 -449,8 +462,8 @@@ public abstract class ReadCommand exten
              private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
              private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
  
 -            private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName);
 +            private final boolean respectTombstoneThresholds = !SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().ksName);
+             private final boolean enforceStrictLiveness = metadata.enforceStrictLiveness();
  
              private int liveRows = 0;
              private int tombstones = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c7080e7,7a66eca..7564571
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -574,9 -556,10 +574,10 @@@ public class SinglePartitionReadComman
              try
              {
                  final int rowsToCache = metadata().params.caching.rowsPerPartitionToCache();
+                 final boolean enforceStrictLiveness = metadata().enforceStrictLiveness();
  
                  @SuppressWarnings("resource") // we close on exception or upon closing the result of this method
 -                UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp);
 +                UnfilteredRowIterator iter = fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, executionController);
                  try
                  {
                      // Use a custom iterator instead of DataLimits to avoid stopping the original iterator
@@@ -1195,45 -1153,26 +1196,49 @@@
          {
              // Note that the only difference between the command in a group must be the partition key on which
              // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one.
 -            return commands.get(0).startOrderGroup();
 +            return commands.get(0).executionController();
          }
  
 -        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +        public PartitionIterator executeInternal(ReadExecutionController controller)
          {
 -            List<PartitionIterator> partitions = new ArrayList<>(commands.size());
 -            for (SinglePartitionReadCommand cmd : commands)
 -                partitions.add(cmd.executeInternal(orderGroup));
 -
+             // Note that the only difference between the command in a group must be the partition key on which
+             // they applied.
+             boolean enforceStrictLiveness = commands.get(0).metadata().enforceStrictLiveness();
 -            // Because we only have enforce the limit per command, we need to enforce it globally.
 -            return limits.filter(PartitionIterators.concat(partitions),
 +            return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec),
                                   nowInSec,
-                                  selectsFullPartitions);
+                                  selectsFullPartitions,
+                                  enforceStrictLiveness);
          }
  
 -        public QueryPager getPager(PagingState pagingState, int protocolVersion)
 +        public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController)
 +        {
 +            return executeLocally(executionController, true);
 +        }
 +
 +        /**
 +         * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}.
 +         *
 +         * @param executionController - the {@code ReadExecutionController} protecting the read.
 +         * @param sort - whether to sort the inner commands by partition key, required for merging the iterator
 +         *               later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)}
 +         *               because in this case it is safe to do so as there is no merging involved and we don't want to
 +         *               change the old behavior which was to not sort by partition.
 +         *
 +         * @return - the iterator that can be used to retrieve the query result.
 +         */
 +        private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort)
 +        {
 +            List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(commands.size());
 +            for (SinglePartitionReadCommand cmd : commands)
 +                partitions.add(Pair.of(cmd.partitionKey, cmd.executeLocally(executionController)));
 +
 +            if (sort)
 +                Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft()));
 +
 +            return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList()));
 +        }
 +
 +        public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion)
          {
              if (commands.size() == 1)
                  return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java
index 0c8cd37,6b74293..6d7df16
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@@ -20,10 -20,8 +20,11 @@@ package org.apache.cassandra.db.filter
  import java.io.IOException;
  import java.nio.ByteBuffer;
  
+ import org.apache.cassandra.config.CFMetaData;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.aggregation.GroupMaker;
 +import org.apache.cassandra.db.aggregation.GroupingState;
 +import org.apache.cassandra.db.aggregation.AggregationSpecification;
  import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.transform.BasePartitions;
@@@ -139,18 -110,10 +140,21 @@@ public abstract class DataLimit
  
      public abstract DataLimits forShortReadRetry(int toFetch);
  
 +    /**
 +     * Creates a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries.
 +     *
 +     * @param state the <code>GroupMaker</code> state
 +     * @return a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries
 +     */
 +    public DataLimits forGroupByInternalPaging(GroupingState state)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
-     public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData);
+     public abstract boolean hasEnoughLiveData(CachedPartition cached,
+                                               int nowInSec,
+                                               boolean countPartitionsWithOnlyStaticData,
+                                               boolean enforceStrictLiveness);
  
      /**
       * Returns a new {@code Counter} for this limits.
@@@ -210,18 -180,9 +227,20 @@@
  
      public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>>
      {
 +        protected final int nowInSec;
 +        protected final boolean assumeLiveData;
++        private final boolean enforceStrictLiveness;
 +
          // false means we do not propagate our stop signals onto the iterator, we only count
          private boolean enforceLimits = true;
  
-         protected Counter(int nowInSec, boolean assumeLiveData)
++        protected Counter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
 +        {
 +            this.nowInSec = nowInSec;
 +            this.assumeLiveData = assumeLiveData;
++            this.enforceStrictLiveness = enforceStrictLiveness;
 +        }
 +
          public Counter onlyCount()
          {
              this.enforceLimits = false;
@@@ -276,11 -222,6 +295,11 @@@
          public abstract boolean isDone();
          public abstract boolean isDoneForPartition();
  
 +        protected boolean isLive(Row row)
 +        {
-             return assumeLiveData || row.hasLiveData(nowInSec);
++            return assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness);
 +        }
 +
          @Override
          protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition)
          {
@@@ -439,16 -372,25 +461,19 @@@
  
          protected class CQLCounter extends Counter
          {
 -            protected final int nowInSec;
 -            protected final boolean assumeLiveData;
 -            protected final boolean countPartitionsWithOnlyStaticData;
 -
              protected int rowCounted;
              protected int rowInCurrentPartition;
 +            protected final boolean countPartitionsWithOnlyStaticData;
  
              protected boolean hasLiveStaticRow;
 -            private final boolean enforceStrictLiveness;
  
-             public CQLCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
+             public CQLCounter(int nowInSec,
+                               boolean assumeLiveData,
+                               boolean countPartitionsWithOnlyStaticData,
+                               boolean enforceStrictLiveness)
              {
-                 super(nowInSec, assumeLiveData);
 -                this.nowInSec = nowInSec;
 -                this.assumeLiveData = assumeLiveData;
++                super(nowInSec, assumeLiveData, enforceStrictLiveness);
                  this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData;
 -                this.enforceStrictLiveness = enforceStrictLiveness;
              }
  
              @Override
@@@ -566,15 -498,9 +591,15 @@@
          }
  
          @Override
 +        public DataLimits withoutState()
 +        {
 +            return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
+         public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
          {
-             return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData);
+             return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
          }
  
          private class PagingAwareCounter extends CQLCounter
@@@ -605,502 -534,6 +633,508 @@@
      }
  
      /**
 +     * <code>CQLLimits</code> used for GROUP BY queries or queries with aggregates.
 +     * <p>Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence,
 +     * the limits keep track of the number of rows as well as the number of groups.</p>
 +     * <p>A group can only be counted if the next group or the end of the data is reached.</p>
 +     */
 +    private static class CQLGroupByLimits extends CQLLimits
 +    {
 +        /**
 +         * The <code>GroupMaker</code> state
 +         */
 +        protected final GroupingState state;
 +
 +        /**
 +         * The GROUP BY specification
 +         */
 +        protected final AggregationSpecification groupBySpec;
 +
 +        /**
 +         * The limit on the number of groups
 +         */
 +        protected final int groupLimit;
 +
 +        /**
 +         * The limit on the number of groups per partition
 +         */
 +        protected final int groupPerPartitionLimit;
 +
 +        public CQLGroupByLimits(int groupLimit,
 +                                int groupPerPartitionLimit,
 +                                int rowLimit,
 +                                AggregationSpecification groupBySpec)
 +        {
 +            this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, GroupingState.EMPTY_STATE);
 +        }
 +
 +        private CQLGroupByLimits(int groupLimit,
 +                                 int groupPerPartitionLimit,
 +                                 int rowLimit,
 +                                 AggregationSpecification groupBySpec,
 +                                 GroupingState state)
 +        {
 +            super(rowLimit, NO_LIMIT, false);
 +            this.groupLimit = groupLimit;
 +            this.groupPerPartitionLimit = groupPerPartitionLimit;
 +            this.groupBySpec = groupBySpec;
 +            this.state = state;
 +        }
 +
 +        @Override
 +        public Kind kind()
 +        {
 +            return Kind.CQL_GROUP_BY_LIMIT;
 +        }
 +
 +        @Override
 +        public boolean isGroupByLimit()
 +        {
 +            return true;
 +        }
 +
 +        public boolean isUnlimited()
 +        {
 +            return groupLimit == NO_LIMIT && groupPerPartitionLimit == NO_LIMIT && rowLimit == NO_LIMIT;
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            return new CQLLimits(toFetch);
 +        }
 +
 +        @Override
 +        public float estimateTotalResults(ColumnFamilyStore cfs)
 +        {
 +            // For the moment, we return the estimated number of rows as we have no good way of estimating 
 +            // the number of groups that will be returned. Hopefully, we should be able to fix
 +            // that problem at some point.
 +            return super.estimateTotalResults(cfs);
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            return new CQLGroupByLimits(pageSize,
 +                                        groupPerPartitionLimit,
 +                                        rowLimit,
 +                                        groupBySpec,
 +                                        state);
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
 +        {
 +            return new CQLGroupByPagingLimits(pageSize,
 +                                              groupPerPartitionLimit,
 +                                              rowLimit,
 +                                              groupBySpec,
 +                                              state,
 +                                              lastReturnedKey,
 +                                              lastReturnedKeyRemaining);
 +        }
 +
 +        @Override
 +        public DataLimits forGroupByInternalPaging(GroupingState state)
 +        {
 +            return new CQLGroupByLimits(rowLimit,
 +                                        groupPerPartitionLimit,
 +                                        rowLimit,
 +                                        groupBySpec,
 +                                        state);
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
++        public Counter newCounter(int nowInSec,
++                                  boolean assumeLiveData,
++                                  boolean countPartitionsWithOnlyStaticData,
++                                  boolean enforceStrictLiveness)
 +        {
-             return new GroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData);
++            return new GroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
 +        }
 +
 +        @Override
 +        public int count()
 +        {
 +            return groupLimit;
 +        }
 +
 +        @Override
 +        public int perPartitionCount()
 +        {
 +            return groupPerPartitionLimit;
 +        }
 +
 +        @Override
 +        public DataLimits withoutState()
 +        {
 +            return state == GroupingState.EMPTY_STATE
 +                 ? this
 +                 : new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            StringBuilder sb = new StringBuilder();
 +
 +            if (groupLimit != NO_LIMIT)
 +            {
 +                sb.append("GROUP LIMIT ").append(groupLimit);
 +                if (groupPerPartitionLimit != NO_LIMIT || rowLimit != NO_LIMIT)
 +                    sb.append(' ');
 +            }
 +
 +            if (groupPerPartitionLimit != NO_LIMIT)
 +            {
 +                sb.append("GROUP PER PARTITION LIMIT ").append(groupPerPartitionLimit);
 +                if (rowLimit != NO_LIMIT)
 +                    sb.append(' ');
 +            }
 +
 +            if (rowLimit != NO_LIMIT)
 +            {
 +                sb.append("LIMIT ").append(rowLimit);
 +            }
 +
 +            return sb.toString();
 +        }
 +
 +        @Override
 +        public boolean isExhausted(Counter counter)
 +        {
 +            return ((GroupByAwareCounter) counter).rowCounted < rowLimit
 +                    && counter.counted() < groupLimit;
 +        }
 +
 +        protected class GroupByAwareCounter extends Counter
 +        {
 +            private final GroupMaker groupMaker;
 +
 +            protected final boolean countPartitionsWithOnlyStaticData;
 +
 +            /**
 +             * The key of the partition being processed.
 +             */
 +            protected DecoratedKey currentPartitionKey;
 +
 +            /**
 +             * The number of rows counted so far.
 +             */
 +            protected int rowCounted;
 +
 +            /**
 +             * The number of rows counted so far in the current partition.
 +             */
 +            protected int rowCountedInCurrentPartition;
 +
 +            /**
 +             * The number of groups counted so far. A group is counted only once it is complete
 +             * (e.g the next one has been reached).
 +             */
 +            protected int groupCounted;
 +
 +            /**
 +             * The number of groups in the current partition.
 +             */
 +            protected int groupInCurrentPartition;
 +
 +            protected boolean hasGroupStarted;
 +
 +            protected boolean hasLiveStaticRow;
 +
 +            protected boolean hasReturnedRowsFromCurrentPartition;
 +
-             private GroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
++            private GroupByAwareCounter(int nowInSec,
++                                        boolean assumeLiveData,
++                                        boolean countPartitionsWithOnlyStaticData,
++                                        boolean enforceStrictLiveness)
 +            {
-                 super(nowInSec, assumeLiveData);
++                super(nowInSec, assumeLiveData, enforceStrictLiveness);
 +                this.groupMaker = groupBySpec.newGroupMaker(state);
 +                this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData;
 +
 +                // If the end of the partition was reached at the same time than the row limit, the last group might
 +                // not have been counted yet. Due to that we need to guess, based on the state, if the previous group
 +                // is still open.
 +                hasGroupStarted = state.hasClustering();
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
 +            {
 +                if (partitionKey.getKey().equals(state.partitionKey()))
 +                {
 +                    // The only case were we could have state.partitionKey() equals to the partition key
 +                    // is if some of the partition rows have been returned in the previous page but the
 +                    // partition was not exhausted (as the state partition key has not been updated yet).
 +                    // Since we know we have returned rows, we know we have accounted for
 +                    // the static row if any already, so force hasLiveStaticRow to false so we make sure to not count it
 +                    // once more.
 +                    hasLiveStaticRow = false;
 +                    hasReturnedRowsFromCurrentPartition = true;
 +                    hasGroupStarted = true;
 +                }
 +                else
 +                {
 +                    // We need to increment our count of groups if we have reached a new one and unless we had no new
 +                    // content added since we closed our last group (that is, if hasGroupStarted). Note that we may get
 +                    // here with hasGroupStarted == false in the following cases:
 +                    // * the partition limit was reached for the previous partition
 +                    // * the previous partition was containing only one static row
 +                    // * the rows of the last group of the previous partition were all marked as deleted
 +                    if (hasGroupStarted && groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING))
 +                    {
 +                        incrementGroupCount();
 +                        // If we detect, before starting the new partition, that we are done, we need to increase
 +                        // the per partition group count of the previous partition as the next page will start from
 +                        // there.
 +                        if (isDone())
 +                            incrementGroupInCurrentPartitionCount();
 +                        hasGroupStarted = false;
 +                    }
 +                    hasReturnedRowsFromCurrentPartition = false;
 +                    hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow);
 +                }
 +                currentPartitionKey = partitionKey;
 +                // If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition
 +                // because the pager need to retrieve the count associated to the last value it has returned.
 +                if (!isDone())
 +                {
 +                    groupInCurrentPartition = 0;
 +                    rowCountedInCurrentPartition = 0;
 +                }
 +            }
 +
 +            @Override
 +            protected Row applyToStatic(Row row)
 +            {
 +                // It's possible that we're "done" if the partition we just started bumped the number of groups (in
 +                // applyToPartition() above), in which case Transformation will still call this method. In that case, we
 +                // want to ignore the static row, it should (and will) be returned with the next page/group if needs be.
 +                if (isDone())
 +                {
 +                    hasLiveStaticRow = false; // The row has not been returned
 +                    return Rows.EMPTY_STATIC_ROW;
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                // We want to check if the row belongs to a new group even if it has been deleted. The goal being
 +                // to minimize the chances of having to go through the same data twice if we detect on the next
 +                // non deleted row that we have reached the limit.
 +                if (groupMaker.isNewGroup(currentPartitionKey, row.clustering()))
 +                {
 +                    if (hasGroupStarted)
 +                    {
 +                        incrementGroupCount();
 +                        incrementGroupInCurrentPartitionCount();
 +                    }
 +                    hasGroupStarted = false;
 +                }
 +
 +                // That row may have made us increment the group count, which may mean we're done for this partition, in
 +                // which case we shouldn't count this row (it won't be returned).
 +                if (isDoneForPartition())
 +                {
 +                    hasGroupStarted = false;
 +                    return null;
 +                }
 +
 +                if (isLive(row))
 +                {
 +                    hasGroupStarted = true;
 +                    incrementRowCount();
 +                    hasReturnedRowsFromCurrentPartition = true;
 +                }
 +
 +                return row;
 +            }
 +
 +            @Override
 +            public int counted()
 +            {
 +                return groupCounted;
 +            }
 +
 +            @Override
 +            public int countedInCurrentPartition()
 +            {
 +                return groupInCurrentPartition;
 +            }
 +
 +            @Override
 +            public int rowCounted()
 +            {
 +                return rowCounted;
 +            }
 +
 +            @Override
 +            public int rowCountedInCurrentPartition()
 +            {
 +                return rowCountedInCurrentPartition;
 +            }
 +
 +            protected void incrementRowCount()
 +            {
 +                rowCountedInCurrentPartition++;
 +                if (++rowCounted >= rowLimit)
 +                    stop();
 +            }
 +
 +            private void incrementGroupCount()
 +            {
 +                groupCounted++;
 +                if (groupCounted >= groupLimit)
 +                    stop();
 +            }
 +
 +            private void incrementGroupInCurrentPartitionCount()
 +            {
 +                groupInCurrentPartition++;
 +                if (groupInCurrentPartition >= groupPerPartitionLimit)
 +                    stopInPartition();
 +            }
 +
 +            @Override
 +            public boolean isDoneForPartition()
 +            {
 +                return isDone() || groupInCurrentPartition >= groupPerPartitionLimit;
 +            }
 +
 +            @Override
 +            public boolean isDone()
 +            {
 +                return groupCounted >= groupLimit;
 +            }
 +
 +            @Override
 +            public void onPartitionClose()
 +            {
 +                // Normally, we don't count static rows as from a CQL point of view, it will be merge with other
 +                // rows in the partition. However, if we only have the static row, it will be returned as one group
 +                // so count it.
 +                if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition)
 +                {
 +                    incrementRowCount();
 +                    incrementGroupCount();
 +                    incrementGroupInCurrentPartitionCount();
 +                    hasGroupStarted = false;
 +                }
 +                super.onPartitionClose();
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                // Groups are only counted when the end of the group is reached.
 +                // The end of a group is detected by 2 ways:
 +                // 1) a new group is reached
 +                // 2) the end of the data is reached
 +                // We know that the end of the data is reached if the group limit has not been reached
 +                // and the number of rows counted is smaller than the internal page size.
 +                if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit)
 +                {
 +                    incrementGroupCount();
 +                    incrementGroupInCurrentPartitionCount();
 +                }
 +
 +                super.onClose();
 +            }
 +        }
 +    }
 +
 +    private static class CQLGroupByPagingLimits extends CQLGroupByLimits
 +    {
 +        private final ByteBuffer lastReturnedKey;
 +
 +        private final int lastReturnedKeyRemaining;
 +
 +        public CQLGroupByPagingLimits(int groupLimit,
 +                                      int groupPerPartitionLimit,
 +                                      int rowLimit,
 +                                      AggregationSpecification groupBySpec,
 +                                      GroupingState state,
 +                                      ByteBuffer lastReturnedKey,
 +                                      int lastReturnedKeyRemaining)
 +        {
 +            super(groupLimit,
 +                  groupPerPartitionLimit,
 +                  rowLimit,
 +                  groupBySpec,
 +                  state);
 +
 +            this.lastReturnedKey = lastReturnedKey;
 +            this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
 +        }
 +
 +        @Override
 +        public Kind kind()
 +        {
 +            return Kind.CQL_GROUP_BY_PAGING_LIMIT;
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public DataLimits forGroupByInternalPaging(GroupingState state)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
 +        {
 +            assert state == GroupingState.EMPTY_STATE || lastReturnedKey.equals(state.partitionKey());
-             return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData);
++            return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
 +        }
 +
 +        @Override
 +        public DataLimits withoutState()
 +        {
 +            return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec);
 +        }
 +
 +        private class PagingGroupByAwareCounter extends GroupByAwareCounter
 +        {
-             private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
++            private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
 +            {
-                 super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData);
++                super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness);
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row staticRow)
 +            {
 +                if (partitionKey.getKey().equals(lastReturnedKey))
 +                {
 +                    currentPartitionKey = partitionKey;
 +                    groupInCurrentPartition = groupPerPartitionLimit - lastReturnedKeyRemaining;
 +                    hasReturnedRowsFromCurrentPartition = true;
 +                    hasLiveStaticRow = false;
 +                    hasGroupStarted = state.hasClustering();
 +                }
 +                else
 +                {
 +                    super.applyToPartition(partitionKey, staticRow);
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
       * Limits used by thrift; this count partition and cells.
       */
      private static class ThriftLimits extends DataLimits
@@@ -1176,9 -609,9 +1210,9 @@@
              }
          }
  
-         public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData)
+         public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness)
          {
--            return new ThriftCounter(nowInSec, assumeLiveData);
++            return new ThriftCounter(nowInSec, assumeLiveData, enforceStrictLiveness);
          }
  
          public int count()
@@@ -1209,9 -640,10 +1243,9 @@@
              protected int cellsCounted;
              protected int cellsInCurrentPartition;
  
--            public ThriftCounter(int nowInSec, boolean assumeLiveData)
++            public ThriftCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
              {
-                 super(nowInSec, assumeLiveData);
 -                this.nowInSec = nowInSec;
 -                this.assumeLiveData = assumeLiveData;
++                super(nowInSec, assumeLiveData, enforceStrictLiveness);
              }
  
              @Override
@@@ -1324,9 -746,12 +1358,12 @@@
  
          protected class SuperColumnCountingCounter extends ThriftCounter
          {
-             public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData)
+             private final boolean enforceStrictLiveness;
+ 
+             public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness)
              {
--                super(nowInSec, assumeLiveData);
++                super(nowInSec, assumeLiveData, enforceStrictLiveness);
+                 this.enforceStrictLiveness = enforceStrictLiveness;
              }
  
              @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/rows/AbstractRow.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/db/view/ViewUpdateGenerator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/index/internal/composites/ClusteringColumnIndex.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index d5c0566,61bffe5..84e8685
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -43,12 -43,13 +43,14 @@@ public class DataResolver extends Respo
  {
      @VisibleForTesting
      final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 -
 +    private final long queryStartNanoTime;
+     private final boolean enforceStrictLiveness;
  
 -    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
+         this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
      }
  
      public PartitionIterator getData()
@@@ -127,10 -128,10 +129,10 @@@
              for (int i = 0; i < results.size(); i++)
              {
                  DataLimits.Counter singleResultCounter =
-                     command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
+                     command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
  
                  ShortReadResponseProtection protection =
 -                    new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter);
 +                    new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter, queryStartNanoTime);
  
                  /*
                   * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 12d7e19,5af2ad0..c57b691
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1678,7 -1597,10 +1678,10 @@@ public class StorageProxy implements St
          long start = System.nanoTime();
          try
          {
 -            PartitionIterator result = fetchRows(group.commands, consistencyLevel);
 +            PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime);
+             // Note that the only difference between the command in a group must be the partition key on which
+             // they applied.
+             boolean enforceStrictLiveness = group.commands.get(0).metadata().enforceStrictLiveness();
              // If we have more than one command, then despite each read command honoring the limit, the total result
              // might not honor it and so we should enforce it
              if (group.commands.size() > 1)
@@@ -2103,9 -1998,9 +2106,10 @@@
          private final PartitionRangeReadCommand command;
          private final Keyspace keyspace;
          private final ConsistencyLevel consistency;
+         private final boolean enforceStrictLiveness;
  
          private final long startTime;
 +        private final long queryStartNanoTime;
          private DataLimits.Counter counter;
          private PartitionIterator sentQueryIterator;
  
@@@ -2124,7 -2019,7 +2128,8 @@@
              this.totalRangeCount = ranges.rangeCount();
              this.consistency = consistency;
              this.keyspace = keyspace;
 +            this.queryStartNanoTime = queryStartNanoTime;
+             this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
          }
  
          public RowIterator computeNext()
@@@ -2280,9 -2166,10 +2285,10 @@@
  
          // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
  
 -        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)),
 +        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)),
                                         command.nowInSec(),
-                                        command.selectsFullPartition());
+                                        command.selectsFullPartition(),
+                                        command.metadata().enforceStrictLiveness());
      }
  
      public Map<String, List<String>> getSchemaVersions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index ef38242,f44aa24..f5134fa
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -30,7 -31,8 +30,8 @@@ abstract class AbstractQueryPager imple
  {
      protected final ReadCommand command;
      protected final DataLimits limits;
 -    protected final int protocolVersion;
 +    protected final ProtocolVersion protocolVersion;
+     private final boolean enforceStrictLiveness;
  
      private int remaining;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 1b56417,6bc1f80..1a70864
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@@ -54,9 -53,9 +54,9 @@@ public class QueryPager
          int count = 0;
          while (!pager.isExhausted())
          {
 -            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state))
 +            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime))
              {
-                 DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition());
+                 DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition(), metadata.enforceStrictLiveness());
                  PartitionIterators.consume(counter.applyTo(iter));
                  count += counter.counted();
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/test/long/org/apache/cassandra/cql3/CachingBench.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/cql3/CachingBench.java
index 370b3ff,0000000..25f746b
mode 100644,000000..100644
--- a/test/long/org/apache/cassandra/cql3/CachingBench.java
+++ b/test/long/org/apache/cassandra/cql3/CachingBench.java
@@@ -1,375 -1,0 +1,376 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Predicate;
 +
 +import com.google.common.collect.Iterables;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import junit.framework.Assert;
 +import org.apache.cassandra.config.Config.CommitLogSync;
 +import org.apache.cassandra.config.Config.DiskAccessMode;
 +import org.apache.cassandra.cache.ChunkCache;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.Unfiltered;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class CachingBench extends CQLTester
 +{
 +    private static final String STRATEGY = "LeveledCompactionStrategy";
 +
 +    private static final int DEL_SECTIONS = 1000;
 +    private static final int FLUSH_FREQ = 10000;
 +    private static final int SCAN_FREQUENCY_INV = 12000;
 +    static final int COUNT = 29000;
 +    static final int ITERS = 9;
 +
 +    static final int KEY_RANGE = 30;
 +    static final int CLUSTERING_RANGE = 210000;
 +
 +    static final int EXTRA_SIZE = 1025;
 +    static final boolean CONCURRENT_COMPACTIONS = true;
 +
 +    // The name of this method is important!
 +    // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we
 +    // are effectively overriding it.
 +    @BeforeClass
 +    public static void setUpClass()
 +    {
 +        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
 +        DatabaseDescriptor.setCommitLogSyncPeriod(100);
 +        CQLTester.setUpClass();
 +    }
 +    
 +    String hashQuery;
 +
 +    @Before
 +    public void before() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(" +
 +                    "  key int," +
 +                    "  column int," +
 +                    "  data int," +
 +                    "  extra text," +
 +                    "  PRIMARY KEY(key, column)" +
 +                    ")"
 +                   );
 +
 +        String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int",
 +                " CREATE FUNCTION %s (state int, val int)" +
 +                " CALLED ON NULL INPUT" +
 +                " RETURNS int" +
 +                " LANGUAGE java" +
 +                " AS 'return val != null ? state * 17 + val : state;'")).name;
 +        String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text",
 +                " CREATE FUNCTION %s (state int, val text)" +
 +                " CALLED ON NULL INPUT" +
 +                " RETURNS int" +
 +                " LANGUAGE java" +
 +                " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name;
 +
 +        String hashInt = createAggregate(KEYSPACE, "int",
 +                " CREATE AGGREGATE %s (int)" +
 +                " SFUNC " + hashIFunc +
 +                " STYPE int" +
 +                " INITCOND 1");
 +        String hashText = createAggregate(KEYSPACE, "text",
 +                " CREATE AGGREGATE %s (text)" +
 +                " SFUNC " + hashTFunc +
 +                " STYPE int" +
 +                " INITCOND 1");
 +
 +        hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s",
 +                                  hashInt, hashInt, hashInt, hashText);
 +    }
 +    AtomicLong id = new AtomicLong();
 +    long compactionTimeNanos = 0;
 +
 +    void pushData(Random rand, int count) throws Throwable
 +    {
 +        for (int i = 0; i < count; ++i)
 +        {
 +            long ii = id.incrementAndGet();
 +            if (ii % 1000 == 0)
 +                System.out.print('.');
 +            int key = rand.nextInt(KEY_RANGE);
 +            int column = rand.nextInt(CLUSTERING_RANGE);
 +            execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand));
 +            maybeCompact(ii);
 +        }
 +    }
 +
 +    private String genExtra(Random rand)
 +    {
 +        StringBuilder builder = new StringBuilder(EXTRA_SIZE);
 +        for (int i = 0; i < EXTRA_SIZE; ++i)
 +            builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1)));
 +        return builder.toString();
 +    }
 +
 +    void readAndDelete(Random rand, int count) throws Throwable
 +    {
 +        for (int i = 0; i < count; ++i)
 +        {
 +            int key;
 +            UntypedResultSet res;
 +            long ii = id.incrementAndGet();
 +            if (ii % 1000 == 0)
 +                System.out.print('-');
 +            if (rand.nextInt(SCAN_FREQUENCY_INV) != 1)
 +            {
 +                do
 +                {
 +                    key = rand.nextInt(KEY_RANGE);
 +                    long cid = rand.nextInt(DEL_SECTIONS);
 +                    int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
 +                    int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
 +                    res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend);
 +                } while (res.size() == 0);
 +                UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size()));
 +                int clustering = r.getInt("column");
 +                execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering);
 +            }
 +            else
 +            {
 +                execute(hashQuery);
 +            }
 +            maybeCompact(ii);
 +        }
 +    }
 +
 +    private void maybeCompact(long ii)
 +    {
 +        if (ii % FLUSH_FREQ == 0)
 +        {
 +            System.out.print("F");
 +            flush();
 +            if (ii % (FLUSH_FREQ * 10) == 0)
 +            {
 +                System.out.println("C");
 +                long startTime = System.nanoTime();
 +                getCurrentColumnFamilyStore().enableAutoCompaction(!CONCURRENT_COMPACTIONS);
 +                long endTime = System.nanoTime();
 +                compactionTimeNanos += endTime - startTime;
 +                getCurrentColumnFamilyStore().disableAutoCompaction();
 +            }
 +        }
 +    }
 +
 +    public void testSetup(String compactionClass, String compressorClass, DiskAccessMode mode, boolean cacheEnabled) throws Throwable
 +    {
 +        id.set(0);
 +        compactionTimeNanos = 0;
 +        ChunkCache.instance.enable(cacheEnabled);
 +        DatabaseDescriptor.setDiskAccessMode(mode);
 +        alterTable("ALTER TABLE %s WITH compaction = { 'class' :  '" + compactionClass + "'  };");
 +        alterTable("ALTER TABLE %s WITH compression = { 'sstable_compression' : '" + compressorClass + "'  };");
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        cfs.disableAutoCompaction();
 +
 +        long onStartTime = System.currentTimeMillis();
 +        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 +        List<Future<?>> tasks = new ArrayList<>();
 +        for (int ti = 0; ti < 1; ++ti)
 +        {
 +            Random rand = new Random(ti);
 +            tasks.add(es.submit(() -> 
 +            {
 +                for (int i = 0; i < ITERS; ++i)
 +                    try
 +                    {
 +                        pushData(rand, COUNT);
 +                        readAndDelete(rand, COUNT / 3);
 +                    }
 +                    catch (Throwable e)
 +                    {
 +                        throw new AssertionError(e);
 +                    }
 +            }));
 +        }
 +        for (Future<?> task : tasks)
 +            task.get();
 +
 +        flush();
 +        long onEndTime = System.currentTimeMillis();
 +        int startRowCount = countRows(cfs);
 +        int startTombCount = countTombstoneMarkers(cfs);
 +        int startRowDeletions = countRowDeletions(cfs);
 +        int startTableCount = cfs.getLiveSSTables().size();
 +        long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
 +        System.out.println("\nCompession: " + cfs.getCompressionParameters().toString());
 +        System.out.println("Reader " + cfs.getLiveSSTables().iterator().next().getFileDataInput(0).toString());
 +        if (cacheEnabled)
 +            System.out.format("Cache size %s requests %,d hit ratio %f\n",
 +                FileUtils.stringifyFileSize(ChunkCache.instance.metrics.size.getValue()),
 +                ChunkCache.instance.metrics.requests.getCount(),
 +                ChunkCache.instance.metrics.hitRate.getValue());
 +        else
 +        {
 +            Assert.assertTrue("Chunk cache had requests: " + ChunkCache.instance.metrics.requests.getCount(), ChunkCache.instance.metrics.requests.getCount() < COUNT);
 +            System.out.println("Cache disabled");
 +        }
 +        System.out.println(String.format("Operations completed in %.3fs", (onEndTime - onStartTime) * 1e-3));
 +        if (!CONCURRENT_COMPACTIONS)
 +            System.out.println(String.format(", out of which %.3f for non-concurrent compaction", compactionTimeNanos * 1e-9));
 +        else
 +            System.out.println();
 +
 +        String hashesBefore = getHashes();
 +        long startTime = System.currentTimeMillis();
 +        CompactionManager.instance.performMaximal(cfs, true);
 +        long endTime = System.currentTimeMillis();
 +
 +        int endRowCount = countRows(cfs);
 +        int endTombCount = countTombstoneMarkers(cfs);
 +        int endRowDeletions = countRowDeletions(cfs);
 +        int endTableCount = cfs.getLiveSSTables().size();
 +        long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
 +
 +        System.out.println(String.format("Major compaction completed in %.3fs",
 +                (endTime - startTime) * 1e-3));
 +        System.out.println(String.format("At start: %,12d tables %12s %,12d rows %,12d deleted rows %,12d tombstone markers",
 +                startTableCount, FileUtils.stringifyFileSize(startSize), startRowCount, startRowDeletions, startTombCount));
 +        System.out.println(String.format("At end:   %,12d tables %12s %,12d rows %,12d deleted rows %,12d tombstone markers",
 +                endTableCount, FileUtils.stringifyFileSize(endSize), endRowCount, endRowDeletions, endTombCount));
 +        String hashesAfter = getHashes();
 +
 +        Assert.assertEquals(hashesBefore, hashesAfter);
 +    }
 +
 +    private String getHashes() throws Throwable
 +    {
 +        long startTime = System.currentTimeMillis();
 +        String hashes = Arrays.toString(getRows(execute(hashQuery))[0]);
 +        long endTime = System.currentTimeMillis();
 +        System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3));
 +        return hashes;
 +    }
 +
 +    @Test
 +    public void testWarmup() throws Throwable
 +    {
 +        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false);
 +    }
 +
 +    @Test
 +    public void testLZ4CachedMmap() throws Throwable
 +    {
 +        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, true);
 +    }
 +
 +    @Test
 +    public void testLZ4CachedStandard() throws Throwable
 +    {
 +        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, true);
 +    }
 +
 +    @Test
 +    public void testLZ4UncachedMmap() throws Throwable
 +    {
 +        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.mmap, false);
 +    }
 +
 +    @Test
 +    public void testLZ4UncachedStandard() throws Throwable
 +    {
 +        testSetup(STRATEGY, "LZ4Compressor", DiskAccessMode.standard, false);
 +    }
 +
 +    @Test
 +    public void testCachedStandard() throws Throwable
 +    {
 +        testSetup(STRATEGY, "", DiskAccessMode.standard, true);
 +    }
 +
 +    @Test
 +    public void testUncachedStandard() throws Throwable
 +    {
 +        testSetup(STRATEGY, "", DiskAccessMode.standard, false);
 +    }
 +
 +    @Test
 +    public void testMmapped() throws Throwable
 +    {
 +        testSetup(STRATEGY, "", DiskAccessMode.mmap, false /* doesn't matter */);
 +    }
 +
 +    int countTombstoneMarkers(ColumnFamilyStore cfs)
 +    {
 +        return count(cfs, x -> x.isRangeTombstoneMarker());
 +    }
 +
 +    int countRowDeletions(ColumnFamilyStore cfs)
 +    {
 +        return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive());
 +    }
 +
 +    int countRows(ColumnFamilyStore cfs)
 +    {
++        boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness();
 +        int nowInSec = FBUtilities.nowInSeconds();
-         return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec));
++        return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec, enforceStrictLiveness));
 +    }
 +
 +    private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate)
 +    {
 +        int count = 0;
 +        for (SSTableReader reader : cfs.getLiveSSTables())
 +            count += count(reader, predicate);
 +        return count;
 +    }
 +
 +    int count(SSTableReader reader, Predicate<Unfiltered> predicate)
 +    {
 +        int instances = 0;
 +        try (ISSTableScanner partitions = reader.getScanner())
 +        {
 +            while (partitions.hasNext())
 +            {
 +                try (UnfilteredRowIterator iter = partitions.next())
 +                {
 +                    while (iter.hasNext())
 +                    {
 +                        Unfiltered atom = iter.next();
 +                        if (predicate.test(atom))
 +                            ++instances;
 +                    }
 +                }
 +            }
 +        }
 +        return instances;
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e3d56ec/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
----------------------------------------------------------------------
diff --cc test/long/org/apache/cassandra/cql3/GcCompactionBench.java
index ca39b55,0000000..84c0384
mode 100644,000000..100644
--- a/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
+++ b/test/long/org/apache/cassandra/cql3/GcCompactionBench.java
@@@ -1,374 -1,0 +1,375 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.List;
 +import java.util.Random;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Predicate;
 +
 +import com.google.common.collect.Iterables;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import junit.framework.Assert;
 +import org.apache.cassandra.config.Config.CommitLogSync;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.compaction.CompactionManager;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.Unfiltered;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class GcCompactionBench extends CQLTester
 +{
 +    private static final String SIZE_TIERED_STRATEGY = "SizeTieredCompactionStrategy', 'min_sstable_size' : '0";
 +    private static final String LEVELED_STRATEGY = "LeveledCompactionStrategy', 'sstable_size_in_mb' : '16";
 +
 +    private static final int DEL_SECTIONS = 1000;
 +    private static final int FLUSH_FREQ = 10000;
 +    private static final int RANGE_FREQUENCY_INV = 16;
 +    static final int COUNT = 90000;
 +    static final int ITERS = 9;
 +
 +    static final int KEY_RANGE = 10;
 +    static final int CLUSTERING_RANGE = 210000;
 +
 +    static final int EXTRA_SIZE = 1025;
 +
 +    // The name of this method is important!
 +    // CommitLog settings must be applied before CQLTester sets up; by using the same name as its @BeforeClass method we
 +    // are effectively overriding it.
 +    @BeforeClass
 +    public static void setUpClass()     // overrides CQLTester.setUpClass()
 +    {
 +        DatabaseDescriptor.setCommitLogSync(CommitLogSync.periodic);
 +        DatabaseDescriptor.setCommitLogSyncPeriod(100);
 +        CQLTester.setUpClass();
 +    }
 +
 +    String hashQuery;
 +
 +    @Before
 +    public void before() throws Throwable
 +    {
 +        createTable("CREATE TABLE %s(" +
 +                    "  key int," +
 +                    "  column int," +
 +                    "  data int," +
 +                    "  extra text," +
 +                    "  PRIMARY KEY(key, column)" +
 +                    ")"
 +                   );
 +
 +        String hashIFunc = parseFunctionName(createFunction(KEYSPACE, "int, int",
 +                " CREATE FUNCTION %s (state int, val int)" +
 +                " CALLED ON NULL INPUT" +
 +                " RETURNS int" +
 +                " LANGUAGE java" +
 +                " AS 'return val != null ? state * 17 + val : state;'")).name;
 +        String hashTFunc = parseFunctionName(createFunction(KEYSPACE, "int, text",
 +                " CREATE FUNCTION %s (state int, val text)" +
 +                " CALLED ON NULL INPUT" +
 +                " RETURNS int" +
 +                " LANGUAGE java" +
 +                " AS 'return val != null ? state * 17 + val.hashCode() : state;'")).name;
 +
 +        String hashInt = createAggregate(KEYSPACE, "int",
 +                " CREATE AGGREGATE %s (int)" +
 +                " SFUNC " + hashIFunc +
 +                " STYPE int" +
 +                " INITCOND 1");
 +        String hashText = createAggregate(KEYSPACE, "text",
 +                " CREATE AGGREGATE %s (text)" +
 +                " SFUNC " + hashTFunc +
 +                " STYPE int" +
 +                " INITCOND 1");
 +
 +        hashQuery = String.format("SELECT count(column), %s(key), %s(column), %s(data), %s(extra), avg(key), avg(column), avg(data) FROM %%s",
 +                                  hashInt, hashInt, hashInt, hashText);
 +    }
 +    AtomicLong id = new AtomicLong();
 +    long compactionTimeNanos = 0;
 +
 +    void pushData(Random rand, int count) throws Throwable
 +    {
 +        for (int i = 0; i < count; ++i)
 +        {
 +            long ii = id.incrementAndGet();
 +            if (ii % 1000 == 0)
 +                System.out.print('.');
 +            int key = rand.nextInt(KEY_RANGE);
 +            int column = rand.nextInt(CLUSTERING_RANGE);
 +            execute("INSERT INTO %s (key, column, data, extra) VALUES (?, ?, ?, ?)", key, column, (int) ii, genExtra(rand));
 +            maybeCompact(ii);
 +        }
 +    }
 +
 +    private String genExtra(Random rand)
 +    {
 +        StringBuilder builder = new StringBuilder(EXTRA_SIZE);
 +        for (int i = 0; i < EXTRA_SIZE; ++i)
 +            builder.append((char) ('a' + rand.nextInt('z' - 'a' + 1)));
 +        return builder.toString();
 +    }
 +
 +    void deleteData(Random rand, int count) throws Throwable
 +    {
 +        for (int i = 0; i < count; ++i)
 +        {
 +            int key;
 +            UntypedResultSet res;
 +            long ii = id.incrementAndGet();
 +            if (ii % 1000 == 0)
 +                System.out.print('-');
 +            if (rand.nextInt(RANGE_FREQUENCY_INV) != 1)
 +            {
 +                do
 +                {
 +                    key = rand.nextInt(KEY_RANGE);
 +                    long cid = rand.nextInt(DEL_SECTIONS);
 +                    int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
 +                    int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
 +                    res = execute("SELECT column FROM %s WHERE key = ? AND column >= ? AND column < ? LIMIT 1", key, cstart, cend);
 +                } while (res.size() == 0);
 +                UntypedResultSet.Row r = Iterables.get(res, rand.nextInt(res.size()));
 +                int clustering = r.getInt("column");
 +                execute("DELETE FROM %s WHERE key = ? AND column = ?", key, clustering);
 +            }
 +            else
 +            {
 +                key = rand.nextInt(KEY_RANGE);
 +                long cid = rand.nextInt(DEL_SECTIONS);
 +                int cstart = (int) (cid * CLUSTERING_RANGE / DEL_SECTIONS);
 +                int cend = (int) ((cid + 1) * CLUSTERING_RANGE / DEL_SECTIONS);
 +                res = execute("DELETE FROM %s WHERE key = ? AND column >= ? AND column < ?", key, cstart, cend);
 +            }
 +            maybeCompact(ii);
 +        }
 +    }
 +
 +    private void maybeCompact(long ii)
 +    {
 +        if (ii % FLUSH_FREQ == 0)
 +        {
 +            System.out.print("F");
 +            flush();
 +            if (ii % (FLUSH_FREQ * 10) == 0)
 +            {
 +                System.out.println("C");
 +                long startTime = System.nanoTime();
 +                getCurrentColumnFamilyStore().enableAutoCompaction(true);
 +                long endTime = System.nanoTime();
 +                compactionTimeNanos += endTime - startTime;
 +                getCurrentColumnFamilyStore().disableAutoCompaction();
 +            }
 +        }
 +    }
 +
 +    public void testGcCompaction(TombstoneOption tombstoneOption, TombstoneOption backgroundTombstoneOption, String compactionClass) throws Throwable
 +    {
 +        id.set(0);
 +        compactionTimeNanos = 0;
 +        alterTable("ALTER TABLE %s WITH compaction = { 'class' :  '" + compactionClass + "', 'provide_overlapping_tombstones' : '" + backgroundTombstoneOption + "'  };");
 +        ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
 +        cfs.disableAutoCompaction();
 +
 +        long onStartTime = System.currentTimeMillis();
 +        ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 +        List<Future<?>> tasks = new ArrayList<>();
 +        for (int ti = 0; ti < 1; ++ti)
 +        {
 +            Random rand = new Random(ti);
 +            tasks.add(es.submit(() -> 
 +            {
 +                for (int i = 0; i < ITERS; ++i)
 +                    try
 +                    {
 +                        pushData(rand, COUNT);
 +                        deleteData(rand, COUNT / 3);
 +                    }
 +                    catch (Throwable e)
 +                    {
 +                        throw new AssertionError(e);
 +                    }
 +            }));
 +        }
 +        for (Future<?> task : tasks)
 +            task.get();
 +
 +        flush();
 +        long onEndTime = System.currentTimeMillis();
 +        int startRowCount = countRows(cfs);
 +        int startTombCount = countTombstoneMarkers(cfs);
 +        int startRowDeletions = countRowDeletions(cfs);
 +        int startTableCount = cfs.getLiveSSTables().size();
 +        long startSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
 +        System.out.println();
 +
 +        String hashesBefore = getHashes();
 +
 +        long startTime = System.currentTimeMillis();
 +        CompactionManager.instance.performGarbageCollection(cfs, tombstoneOption, 0);
 +        long endTime = System.currentTimeMillis();
 +
 +        int endRowCount = countRows(cfs);
 +        int endTombCount = countTombstoneMarkers(cfs);
 +        int endRowDeletions = countRowDeletions(cfs);
 +        int endTableCount = cfs.getLiveSSTables().size();
 +        long endSize = SSTableReader.getTotalBytes(cfs.getLiveSSTables());
 +
 +        System.out.println(cfs.getCompactionParametersJson());
 +        System.out.println(String.format("%s compactions completed in %.3fs",
 +                tombstoneOption.toString(), (endTime - startTime) * 1e-3));
 +        System.out.println(String.format("Operations completed in %.3fs, out of which %.3f for ongoing " + backgroundTombstoneOption + " background compactions",
 +                (onEndTime - onStartTime) * 1e-3, compactionTimeNanos * 1e-9));
 +        System.out.println(String.format("At start: %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
 +                startTableCount, startSize, startRowCount, startRowDeletions, startTombCount));
 +        System.out.println(String.format("At end:   %12d tables %12d bytes %12d rows %12d deleted rows %12d tombstone markers",
 +                endTableCount, endSize, endRowCount, endRowDeletions, endTombCount));
 +
 +        String hashesAfter = getHashes();
 +        Assert.assertEquals(hashesBefore, hashesAfter);
 +    }
 +
 +    private String getHashes() throws Throwable
 +    {
 +        long startTime = System.currentTimeMillis();
 +        String hashes = Arrays.toString(getRows(execute(hashQuery))[0]);
 +        long endTime = System.currentTimeMillis();
 +        System.out.println(String.format("Hashes: %s, retrieved in %.3fs", hashes, (endTime - startTime) * 1e-3));
 +        return hashes;
 +    }
 +
 +    @Test
 +    public void testCellAtEnd() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testRowAtEnd() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, LEVELED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testCellThroughout() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, LEVELED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testRowThroughout() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, LEVELED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testCopyCompaction() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, LEVELED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testCellAtEndSizeTiered() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.CELL, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testRowAtEndSizeTiered() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.ROW, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testCellThroughoutSizeTiered() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.CELL, TombstoneOption.CELL, SIZE_TIERED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testRowThroughoutSizeTiered() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.ROW, TombstoneOption.ROW, SIZE_TIERED_STRATEGY);
 +    }
 +
 +    @Test
 +    public void testCopyCompactionSizeTiered() throws Throwable
 +    {
 +        testGcCompaction(TombstoneOption.NONE, TombstoneOption.NONE, SIZE_TIERED_STRATEGY);
 +    }
 +
 +    int countTombstoneMarkers(ColumnFamilyStore cfs)
 +    {
 +        return count(cfs, x -> x.isRangeTombstoneMarker());
 +    }
 +
 +    int countRowDeletions(ColumnFamilyStore cfs)
 +    {
 +        return count(cfs, x -> x.isRow() && !((Row) x).deletion().isLive());
 +    }
 +
 +    int countRows(ColumnFamilyStore cfs)
 +    {
++        boolean enforceStrictLiveness = cfs.metadata.enforceStrictLiveness();
 +        int nowInSec = FBUtilities.nowInSeconds();
-         return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec));
++        return count(cfs, x -> x.isRow() && ((Row) x).hasLiveData(nowInSec, enforceStrictLiveness));
 +    }
 +
 +    private int count(ColumnFamilyStore cfs, Predicate<Unfiltered> predicate)
 +    {
 +        int count = 0;
 +        for (SSTableReader reader : cfs.getLiveSSTables())
 +            count += count(reader, predicate);
 +        return count;
 +    }
 +
 +    int count(SSTableReader reader, Predicate<Unfiltered> predicate)
 +    {
 +        int instances = 0;
 +        try (ISSTableScanner partitions = reader.getScanner())
 +        {
 +            while (partitions.hasNext())
 +            {
 +                try (UnfilteredRowIterator iter = partitions.next())
 +                {
 +                    while (iter.hasNext())
 +                    {
 +                        Unfiltered atom = iter.next();
 +                        if (predicate.test(atom))
 +                            ++instances;
 +                    }
 +                }
 +            }
 +        }
 +        return instances;
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message