cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ble...@apache.org
Subject [3/6] cassandra git commit: Merge branch cassandra-2.2 into cassandra-3.0
Date Fri, 14 Jul 2017 15:30:23 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 60cfbba,0000000..c96a893
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -1,498 -1,0 +1,498 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service;
 +
 +import java.net.InetAddress;
 +import java.util.*;
 +import java.util.concurrent.TimeoutException;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.*;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.net.*;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +public class DataResolver extends ResponseResolver
 +{
 +    @VisibleForTesting
 +    final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
 +
 +    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
 +    {
 +        super(keyspace, command, consistency, maxResponseCount);
 +    }
 +
 +    public PartitionIterator getData()
 +    {
 +        ReadResponse response = responses.iterator().next().payload;
 +        return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec());
 +    }
 +
 +    public PartitionIterator resolve()
 +    {
 +        // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
 +        // at the beginning of this method), so grab the response count once and use that through the method.
 +        int count = responses.size();
 +        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
 +        InetAddress[] sources = new InetAddress[count];
 +        for (int i = 0; i < count; i++)
 +        {
 +            MessageIn<ReadResponse> msg = responses.get(i);
 +            iters.add(msg.payload.makeIterator(command));
 +            sources[i] = msg.from;
 +        }
 +
 +        // Even though every responses should honor the limit, we might have more than requested post reconciliation,
 +        // so ensure we're respecting the limit.
-         DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true);
++        DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition());
 +        return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter));
 +    }
 +
 +    public void compareResponses()
 +    {
 +        // We need to fully consume the results to trigger read repairs if appropriate
 +        try (PartitionIterator iterator = resolve())
 +        {
 +            PartitionIterators.consume(iterator);
 +        }
 +    }
 +
 +    private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
 +    {
 +        // If we have only one results, there is no read repair to do and we can't get short reads
 +        if (results.size() == 1)
 +            return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
 +
 +        UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
 +
 +        // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit,
 +        // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother.
 +        if (!command.limits().isUnlimited())
 +        {
 +            for (int i = 0; i < results.size(); i++)
 +                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
 +        }
 +
 +        return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
 +    }
 +
 +    private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
 +    {
 +        private final InetAddress[] sources;
 +
 +        public RepairMergeListener(InetAddress[] sources)
 +        {
 +            this.sources = sources;
 +        }
 +
 +        public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
 +        {
 +            return new MergeListener(partitionKey, columns(versions), isReversed(versions));
 +        }
 +
 +        private PartitionColumns columns(List<UnfilteredRowIterator> versions)
 +        {
 +            Columns statics = Columns.NONE;
 +            Columns regulars = Columns.NONE;
 +            for (UnfilteredRowIterator iter : versions)
 +            {
 +                if (iter == null)
 +                    continue;
 +
 +                PartitionColumns cols = iter.columns();
 +                statics = statics.mergeTo(cols.statics);
 +                regulars = regulars.mergeTo(cols.regulars);
 +            }
 +            return new PartitionColumns(statics, regulars);
 +        }
 +
 +        private boolean isReversed(List<UnfilteredRowIterator> versions)
 +        {
 +            for (UnfilteredRowIterator iter : versions)
 +            {
 +                if (iter == null)
 +                    continue;
 +
 +                // Everything will be in the same order
 +                return iter.isReverseOrder();
 +            }
 +
 +            assert false : "Expected at least one iterator";
 +            return false;
 +        }
 +
 +        public void close()
 +        {
 +            try
 +            {
 +                FBUtilities.waitOnFutures(repairResults, DatabaseDescriptor.getWriteRpcTimeout());
 +            }
 +            catch (TimeoutException ex)
 +            {
 +                // We got all responses, but timed out while repairing
 +                int blockFor = consistency.blockFor(keyspace);
 +                if (Tracing.isTracing())
 +                    Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
 +                else
 +                    logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 +
 +                throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
 +            }
 +        }
 +
 +        private class MergeListener implements UnfilteredRowIterators.MergeListener
 +        {
 +            private final DecoratedKey partitionKey;
 +            private final PartitionColumns columns;
 +            private final boolean isReversed;
 +            private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length];
 +
 +            private final Row.Builder[] currentRows = new Row.Builder[sources.length];
 +            private final RowDiffListener diffListener;
 +
 +            // The partition level deletion for the merge row.
 +            private DeletionTime partitionLevelDeletion;
 +            // When merged has a currently open marker, its time. null otherwise.
 +            private DeletionTime mergedDeletionTime;
 +            // For each source, the time of the current deletion as known by the source.
 +            private final DeletionTime[] sourceDeletionTime = new DeletionTime[sources.length];
 +            // For each source, record if there is an open range to send as repair, and from where.
 +            private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length];
 +
 +            public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
 +            {
 +                this.partitionKey = partitionKey;
 +                this.columns = columns;
 +                this.isReversed = isReversed;
 +
 +                this.diffListener = new RowDiffListener()
 +                {
 +                    public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original)
 +                    {
 +                        if (merged != null && !merged.equals(original))
 +                            currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged);
 +                    }
 +
 +                    public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original)
 +                    {
 +                        if (merged != null && !merged.equals(original))
 +                            currentRow(i, clustering).addRowDeletion(merged);
 +                    }
 +
 +                    public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original)
 +                    {
 +                        if (merged != null && !merged.equals(original))
 +                            currentRow(i, clustering).addComplexDeletion(column, merged);
 +                    }
 +
 +                    public void onCell(int i, Clustering clustering, Cell merged, Cell original)
 +                    {
 +                        if (merged != null && !merged.equals(original))
 +                            currentRow(i, clustering).addCell(merged);
 +                    }
 +
 +                };
 +            }
 +
 +            private PartitionUpdate update(int i)
 +            {
 +                if (repairs[i] == null)
 +                    repairs[i] = new PartitionUpdate(command.metadata(), partitionKey, columns, 1);
 +                return repairs[i];
 +            }
 +
 +            private Row.Builder currentRow(int i, Clustering clustering)
 +            {
 +                if (currentRows[i] == null)
 +                {
 +                    currentRows[i] = BTreeRow.sortedBuilder();
 +                    currentRows[i].newRow(clustering);
 +                }
 +                return currentRows[i];
 +            }
 +
 +            public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
 +            {
 +                this.partitionLevelDeletion = mergedDeletion;
 +                for (int i = 0; i < versions.length; i++)
 +                {
 +                    if (mergedDeletion.supersedes(versions[i]))
 +                        update(i).addPartitionDeletion(mergedDeletion);
 +                }
 +            }
 +
 +            public void onMergedRows(Row merged, Row[] versions)
 +            {
 +                // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle
 +                // those case directly in their respective methods (in other words, it would be inefficient to send a row
 +                // deletion as repair when we know we've already send a partition level or range tombstone that covers it).
 +                if (merged.isEmpty())
 +                    return;
 +
 +                Rows.diff(diffListener, merged, versions);
 +                for (int i = 0; i < currentRows.length; i++)
 +                {
 +                    if (currentRows[i] != null)
 +                        update(i).add(currentRows[i].build());
 +                }
 +                Arrays.fill(currentRows, null);
 +            }
 +
 +            private DeletionTime currentDeletion()
 +            {
 +                return mergedDeletionTime == null ? partitionLevelDeletion : mergedDeletionTime;
 +            }
 +
 +            public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
 +            {
 +                // The current deletion as of dealing with this marker.
 +                DeletionTime currentDeletion = currentDeletion();
 +
 +                for (int i = 0; i < versions.length; i++)
 +                {
 +                    RangeTombstoneMarker marker = versions[i];
 +
 +                    // Update what the source now thinks is the current deletion
 +                    if (marker != null)
 +                        sourceDeletionTime[i] = marker.isOpen(isReversed) ? marker.openDeletionTime(isReversed) : null;
 +
 +                    // If merged == null, some of the source is opening or closing a marker
 +                    if (merged == null)
 +                    {
 +                        // but if it's not this source, move to the next one
 +                        if (marker == null)
 +                            continue;
 +
 +                        // We have a close and/or open marker for a source, with nothing corresponding in merged.
 +                        // Because merged is a superset, this imply that we have a current deletion (being it due to an
 +                        // early opening in merged or a partition level deletion) and that this deletion will still be
 +                        // active after that point. Further whatever deletion was open or is open by this marker on the
 +                        // source, that deletion cannot supersedes the current one.
 +                        //
 +                        // But while the marker deletion (before and/or after this point) cannot supersed the current
 +                        // deletion, we want to know if it's equal to it (both before and after), because in that case
 +                        // the source is up to date and we don't want to include repair.
 +                        //
 +                        // So in practice we have 2 possible case:
 +                        //  1) the source was up-to-date on deletion up to that point (markerToRepair[i] == null). Then
 +                        //     it won't be from that point on unless it's a boundary and the new opened deletion time
 +                        //     is also equal to the current deletion (note that this implies the boundary has the same
 +                        //     closing and opening deletion time, which should generally not happen, but can due to legacy
 +                        //     reading code not avoiding this for a while, see CASSANDRA-13237).
 +                        //   2) the source wasn't up-to-date on deletion up to that point (markerToRepair[i] != null), and
 +                        //      it may now be (if it isn't we just have nothing to do for that marker).
 +                        assert !currentDeletion.isLive() : currentDeletion.toString();
 +
 +                        if (markerToRepair[i] == null)
 +                        {
 +                            // Since there is an ongoing merged deletion, the only way we don't have an open repair for
 +                            // this source is that it had a range open with the same deletion as current and it's
 +                            // closing it.
 +                            assert marker.isClose(isReversed) && currentDeletion.equals(marker.closeDeletionTime(isReversed))
 +                                 : String.format("currentDeletion=%s, marker=%s", currentDeletion, marker.toString(command.metadata()));
 +
 +                            // and so unless it's a boundary whose opening deletion time is still equal to the current
 +                            // deletion (see comment above for why this can actually happen), we have to repair the source
 +                            // from that point on.
 +                            if (!(marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed))))
 +                                markerToRepair[i] = marker.closeBound(isReversed).invert();
 +                        }
 +                        // In case 2) above, we only have something to do if the source is up-to-date after that point
 +                        else if (marker.isOpen(isReversed) && currentDeletion.equals(marker.openDeletionTime(isReversed)))
 +                        {
 +                            closeOpenMarker(i, marker.openBound(isReversed).invert());
 +                        }
 +                    }
 +                    else
 +                    {
 +                        // We have a change of current deletion in merged (potentially to/from no deletion at all).
 +
 +                        if (merged.isClose(isReversed))
 +                        {
 +                            // We're closing the merged range. If we've marked the source as needing to be repaired for
 +                            // that range, close and add it to the repair to be sent.
 +                            if (markerToRepair[i] != null)
 +                                closeOpenMarker(i, merged.closeBound(isReversed));
 +
 +                        }
 +
 +                        if (merged.isOpen(isReversed))
 +                        {
 +                            // If we're opening a new merged range (or just switching deletion), then unless the source
 +                            // is up to date on that deletion (note that we've updated what the source deleteion is
 +                            // above), we'll have to sent the range to the source.
 +                            DeletionTime newDeletion = merged.openDeletionTime(isReversed);
 +                            DeletionTime sourceDeletion = sourceDeletionTime[i];
 +                            if (!newDeletion.equals(sourceDeletion))
 +                                markerToRepair[i] = merged.openBound(isReversed);
 +                        }
 +                    }
 +                }
 +
 +                if (merged != null)
 +                    mergedDeletionTime = merged.isOpen(isReversed) ? merged.openDeletionTime(isReversed) : null;
 +            }
 +
 +            private void closeOpenMarker(int i, Slice.Bound close)
 +            {
 +                Slice.Bound open = markerToRepair[i];
 +                update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
 +                markerToRepair[i] = null;
 +            }
 +
 +            public void close()
 +            {
 +                for (int i = 0; i < repairs.length; i++)
 +                {
 +                    if (repairs[i] == null)
 +                        continue;
 +
 +                    // use a separate verb here because we don't want these to be get the white glove hint-
 +                    // on-timeout behavior that a "real" mutation gets
 +                    Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
 +                    MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
 +                    repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
 +                }
 +            }
 +        }
 +    }
 +
 +    private class ShortReadProtection extends Transformation<UnfilteredRowIterator>
 +    {
 +        private final InetAddress source;
 +        private final DataLimits.Counter counter;
 +        private final DataLimits.Counter postReconciliationCounter;
 +
 +        private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter)
 +        {
 +            this.source = source;
-             this.counter = command.limits().newCounter(command.nowInSec(), false).onlyCount();
++            this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount();
 +            this.postReconciliationCounter = postReconciliationCounter;
 +        }
 +
 +        @Override
 +        public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
 +        {
 +            partition = Transformation.apply(partition, counter);
 +            // must apply and extend with same protection instance
 +            ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(), partition.partitionKey());
 +            partition = MoreRows.extend(partition, protection);
 +            partition = Transformation.apply(partition, protection); // apply after, so it is retained when we extend (in case we need to reextend)
 +            return partition;
 +        }
 +
 +        private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator>
 +        {
 +            final CFMetaData metadata;
 +            final DecoratedKey partitionKey;
 +            Clustering lastClustering;
 +            int lastCount = 0;
 +
 +            private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey)
 +            {
 +                this.metadata = metadata;
 +                this.partitionKey = partitionKey;
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                lastClustering = row.clustering();
 +                return row;
 +            }
 +
 +            @Override
 +            public UnfilteredRowIterator moreContents()
 +            {
 +                // We have a short read if the node this is the result of has returned the requested number of
 +                // rows for that partition (i.e. it has stopped returning results due to the limit), but some of
 +                // those results haven't made it in the final result post-reconciliation due to other nodes
 +                // tombstones. If that is the case, then the node might have more results that we should fetch
 +                // as otherwise we might return less results than required, or results that shouldn't be returned
 +                // (because the node has tombstone that hides future results from other nodes but that haven't
 +                // been returned due to the limit).
 +                // Also note that we only get here once all the results for this node have been returned, and so
 +                // if the node had returned the requested number but we still get there, it imply some results were
 +                // skipped during reconciliation.
 +                if (lastCount == counter.counted() || !counter.isDoneForPartition())
 +                    return null;
 +                lastCount = counter.counted();
 +
 +                assert !postReconciliationCounter.isDoneForPartition();
 +
 +                // We need to try to query enough additional results to fulfill our query, but because we could still
 +                // get short reads on that additional query, just querying the number of results we miss may not be
 +                // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only
 +                // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result.
 +                // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that
 +                // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
 +                // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
 +                // counting iterator.
 +                int n = postReconciliationCounter.countedInCurrentPartition();
 +                int x = counter.countedInCurrentPartition();
 +                int toQuery = Math.max(((n * n) / x) - n, 1);
 +
 +                DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
 +                ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey);
 +                ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false);
 +                SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(),
 +                                                                                   command.nowInSec(),
 +                                                                                   command.columnFilter(),
 +                                                                                   command.rowFilter(),
 +                                                                                   retryLimits,
 +                                                                                   partitionKey,
 +                                                                                   retryFilter);
 +
 +                return doShortReadRetry(cmd);
 +            }
 +
 +            private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
 +            {
 +                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
 +                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
 +                if (StorageProxy.canDoLocalRequest(source))
 +                      StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
 +                else
 +                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler);
 +
 +                // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
 +                handler.awaitResults();
 +                assert resolver.responses.size() == 1;
 +                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand);
 +            }
 +        }
 +    }
 +
 +    public boolean isDataPresent()
 +    {
 +        return !responses.isEmpty();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index ea082d5,7b7979d..6610cf7
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -1598,14 -1358,11 +1598,14 @@@ public class StorageProxy implements St
      throws UnavailableException, ReadFailureException, ReadTimeoutException
      {
          long start = System.nanoTime();
 -        List<Row> rows = null;
 -
          try
          {
 -            rows = fetchRows(commands, consistencyLevel);
 +            PartitionIterator result = fetchRows(group.commands, consistencyLevel);
 +            // If we have more than one command, then despite each read command honoring the limit, the total result
 +            // might not honor it and so we should enforce it
 +            if (group.commands.size() > 1)
-                 result = group.limits().filter(result, group.nowInSec());
++                result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition());
 +            return result;
          }
          catch (UnavailableException e)
          {
@@@ -1953,218 -1716,267 +1953,220 @@@
          }
      }
  
 -    public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
 -    throws UnavailableException, ReadFailureException, ReadTimeoutException
 +    private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
      {
 -        Tracing.trace("Computing ranges to query");
 -        long startTime = System.nanoTime();
 +        private final ReadCallback handler;
 +        private PartitionIterator result;
  
 -        Keyspace keyspace = Keyspace.open(command.keyspace);
 -        List<Row> rows;
 -        // now scan until we have enough results
 -        try
 +        private SingleRangeResponse(ReadCallback handler)
          {
 -            int liveRowCount = 0;
 -            boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
 -            rows = new ArrayList<>();
 +            this.handler = handler;
 +        }
  
 -            // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
 -            // expensive in clusters with vnodes)
 -            List<? extends AbstractBounds<RowPosition>> ranges;
 -            if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
 -                ranges = command.keyRange.unwrap();
 -            else
 -                ranges = getRestrictedRanges(command.keyRange);
 -
 -            // determine the number of rows to be fetched and the concurrency factor
 -            int rowsToBeFetched = command.limit();
 -            int concurrencyFactor;
 -            if (command.requiresScanningAllRanges())
 -            {
 -                // all nodes must be queried
 -                rowsToBeFetched *= ranges.size();
 -                concurrencyFactor = ranges.size();
 -                logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}",
 -                              ranges.size(), concurrencyFactor);
 +        private void waitForResponse() throws ReadTimeoutException
 +        {
 +            if (result != null)
 +                return;
 +
 +            try
 +            {
 +                result = handler.get();
              }
 -            else
 +            catch (DigestMismatchException e)
              {
 -                // our estimate of how many result rows there will be per-range
 -                float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
 -                // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
 -                // fetch enough rows in the first round
 -                resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
 -                concurrencyFactor = resultRowsPerRange == 0.0
 -                                  ? 1
 -                                  : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange)));
 -
 -                logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 -                             resultRowsPerRange,
 -                             command.limit(),
 -                             ranges.size(),
 -                             concurrencyFactor);
 -                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
 -                              ranges.size(),
 -                              concurrencyFactor,
 -                              resultRowsPerRange);
 -            }
 -
 -            boolean haveSufficientRows = false;
 -            int i = 0;
 -            AbstractBounds<RowPosition> nextRange = null;
 -            List<InetAddress> nextEndpoints = null;
 -            List<InetAddress> nextFilteredEndpoints = null;
 -            while (i < ranges.size())
 -            {
 -                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
 -                int concurrentFetchStartingIndex = i;
 -                int concurrentRequests = 0;
 -                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
 -                {
 -                    AbstractBounds<RowPosition> range = nextRange == null
 -                                                      ? ranges.get(i)
 -                                                      : nextRange;
 -                    List<InetAddress> liveEndpoints = nextEndpoints == null
 -                                                    ? getLiveSortedEndpoints(keyspace, range.right)
 -                                                    : nextEndpoints;
 -                    List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
 -                                                        ? consistency_level.filterForQuery(keyspace, liveEndpoints)
 -                                                        : nextFilteredEndpoints;
 -                    ++i;
 -                    ++concurrentRequests;
 -
 -                    // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
 -                    // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
 -                    // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
 -                    while (i < ranges.size())
 -                    {
 -                        nextRange = ranges.get(i);
 -                        nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
 -                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
 -
 -                        // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
 -                        // don't know how to deal with a wrapping range.
 -                        // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
 -                        // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
 -                        // wire compatibility, so It's likely easier not to bother;
 -                        if (range.right.isMinimum())
 -                            break;
 -
 -                        List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
 -
 -                        // Check if there is enough endpoint for the merge to be possible.
 -                        if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
 -                            break;
 -
 -                        List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
 -
 -                        // Estimate whether merging will be a win or not
 -                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
 -                            break;
 -
 -                        // If we get there, merge this range and the next one
 -                        range = range.withNewRight(nextRange.right);
 -                        liveEndpoints = merged;
 -                        filteredEndpoints = filteredMerged;
 -                        ++i;
 -                    }
 -
 -                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
 +                throw new AssertionError(e); // no digests in range slices yet
 +            }
 +        }
  
 -                    // collect replies and resolve according to consistency level
 -                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
 -                    List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
 -                    handler.assureSufficientLiveNodes();
 -                    resolver.setSources(filteredEndpoints);
 -                    if (filteredEndpoints.size() == 1
 -                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
 -                    {
 -                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
 -                    }
 -                    else
 -                    {
 -                        MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
 -                        for (InetAddress endpoint : filteredEndpoints)
 -                        {
 -                            Tracing.trace("Enqueuing request to {}", endpoint);
 -                            MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
 -                        }
 -                    }
 -                    scanHandlers.add(Pair.create(nodeCmd, handler));
 -                }
 -                Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
 +        protected RowIterator computeNext()
 +        {
 +            waitForResponse();
 +            return result.hasNext() ? result.next() : endOfData();
 +        }
  
 -                List<AsyncOneResponse> repairResponses = new ArrayList<>();
 -                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
 -                {
 -                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
 -                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
 +        public void close()
 +        {
 +            if (result != null)
 +                result.close();
 +        }
 +    }
  
 -                    try
 -                    {
 -                        for (Row row : handler.get())
 -                        {
 -                            rows.add(row);
 -                            if (countLiveRows)
 -                                liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
 -                        }
 -                        repairResponses.addAll(resolver.repairResults);
 -                    }
 -                    catch (ReadTimeoutException|ReadFailureException ex)
 -                    {
 -                        // we timed out or failed waiting for responses
 -                        int blockFor = consistency_level.blockFor(keyspace);
 -                        int responseCount = resolver.responses.size();
 -                        String gotData = responseCount > 0
 -                                         ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
 -                                         : "";
 -
 -                        boolean isTimeout = ex instanceof ReadTimeoutException;
 -                        if (Tracing.isTracing())
 -                        {
 -                            Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
 -                                          (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        else if (logger.isDebugEnabled())
 -                        {
 -                            logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
 -                                         (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
 -                        }
 -                        throw ex;
 -                    }
 -                    catch (DigestMismatchException e)
 -                    {
 -                        throw new AssertionError(e); // no digests in range slices yet
 -                    }
 +    private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
 +    {
 +        private final Iterator<RangeForQuery> ranges;
 +        private final int totalRangeCount;
 +        private final PartitionRangeReadCommand command;
 +        private final Keyspace keyspace;
 +        private final ConsistencyLevel consistency;
  
 -                    // if we're done, great, otherwise, move to the next range
 -                    int count = countLiveRows ? liveRowCount : rows.size();
 -                    if (count >= rowsToBeFetched)
 -                    {
 -                        haveSufficientRows = true;
 -                        break;
 -                    }
 -                }
 +        private final long startTime;
 +        private DataLimits.Counter counter;
 +        private PartitionIterator sentQueryIterator;
  
 -                try
 -                {
 -                    FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
 -                }
 -                catch (TimeoutException ex)
 -                {
 -                    // We got all responses, but timed out while repairing
 -                    int blockFor = consistency_level.blockFor(keyspace);
 -                    if (Tracing.isTracing())
 -                        Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
 -                    else
 -                        logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 -                    throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
 -                }
 +        private int concurrencyFactor;
 +        // The two following "metric" are maintained to improve the concurrencyFactor
 +        // when it was not good enough initially.
 +        private int liveReturned;
 +        private int rangesQueried;
  
 -                if (haveSufficientRows)
 -                    return command.postReconciliationProcessing(rows);
 +        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
 +        {
 +            this.command = command;
 +            this.concurrencyFactor = concurrencyFactor;
 +            this.startTime = System.nanoTime();
 +            this.ranges = new RangeMerger(ranges, keyspace, consistency);
 +            this.totalRangeCount = ranges.rangeCount();
 +            this.consistency = consistency;
 +            this.keyspace = keyspace;
 +        }
  
 -                // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
 -                // based on the results we've seen so far (as long as we still have ranges left to query)
 -                if (i < ranges.size())
 +        public RowIterator computeNext()
 +        {
 +            try
 +            {
 +                while (sentQueryIterator == null || !sentQueryIterator.hasNext())
                  {
 -                    float fetchedRows = countLiveRows ? liveRowCount : rows.size();
 -                    float remainingRows = rowsToBeFetched - fetchedRows;
 -                    float actualRowsPerRange;
 -                    if (fetchedRows == 0.0)
 -                    {
 -                        // we haven't actually gotten any results, so query all remaining ranges at once
 -                        actualRowsPerRange = 0.0f;
 -                        concurrencyFactor = ranges.size() - i;
 -                    }
 -                    else
 +                    // If we don't have more range to handle, we're done
 +                    if (!ranges.hasNext())
 +                        return endOfData();
 +
 +                    // else, sends the next batch of concurrent queries (after having close the previous iterator)
 +                    if (sentQueryIterator != null)
                      {
 -                        actualRowsPerRange = fetchedRows / i;
 -                        concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
 +                        liveReturned += counter.counted();
 +                        sentQueryIterator.close();
 +
 +                        // It's not the first batch of queries and we're not done, so we we can use what has been
 +                        // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
 +                        updateConcurrencyFactor();
                      }
 -                    logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 -                                 actualRowsPerRange, (int) remainingRows, concurrencyFactor);
 +                    sentQueryIterator = sendNextRequests();
                  }
 +
 +                return sentQueryIterator.next();
 +            }
 +            catch (UnavailableException e)
 +            {
 +                rangeMetrics.unavailables.mark();
 +                throw e;
 +            }
 +            catch (ReadTimeoutException e)
 +            {
 +                rangeMetrics.timeouts.mark();
 +                throw e;
 +            }
 +            catch (ReadFailureException e)
 +            {
 +                rangeMetrics.failures.mark();
 +                throw e;
              }
          }
 -        catch (ReadTimeoutException e)
 +
 +        private void updateConcurrencyFactor()
          {
 -            rangeMetrics.timeouts.mark();
 -            throw e;
 +            if (liveReturned == 0)
 +            {
 +                // we haven't actually gotten any results, so query all remaining ranges at once
 +                concurrencyFactor = totalRangeCount - rangesQueried;
 +                return;
 +            }
 +
 +            // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
 +            // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
 +            int remainingRows = command.limits().count() - liveReturned;
 +            float rowsPerRange = (float)liveReturned / (float)rangesQueried;
 +            concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
 +            logger.trace("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
 +                         rowsPerRange, (int) remainingRows, concurrencyFactor);
          }
 -        catch (UnavailableException e)
 +
 +        private SingleRangeResponse query(RangeForQuery toQuery)
          {
 -            rangeMetrics.unavailables.mark();
 -            throw e;
 +            PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
 +
 +            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
 +
 +            int blockFor = consistency.blockFor(keyspace);
 +            int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
 +            List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
 +            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
 +
 +            handler.assureSufficientLiveNodes();
 +
 +            if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
 +            {
 +                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
 +            }
 +            else
 +            {
 +                for (InetAddress endpoint : toQuery.filteredEndpoints)
 +                {
 +                    MessageOut<ReadCommand> message = rangeCommand.createMessage(MessagingService.instance().getVersion(endpoint));
 +                    Tracing.trace("Enqueuing request to {}", endpoint);
 +                    MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
 +                }
 +            }
 +
 +            return new SingleRangeResponse(handler);
          }
 -        catch (ReadFailureException e)
 +
 +        private PartitionIterator sendNextRequests()
          {
 -            rangeMetrics.failures.mark();
 -            throw e;
 +            List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
 +            for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
 +            {
 +                concurrentQueries.add(query(ranges.next()));
 +                ++rangesQueried;
 +            }
 +
 +            Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
 +            // We want to count the results for the sake of updating the concurrency factor (see updateConcurrencyFactor) but we don't want to
 +            // enforce any particular limit at this point (this could break code than rely on postReconciliationProcessing), hence the DataLimits.NONE.
-             counter = DataLimits.NONE.newCounter(command.nowInSec(), true);
++            counter = DataLimits.NONE.newCounter(command.nowInSec(), true, command.selectsFullPartition());
 +            return counter.applyTo(PartitionIterators.concat(concurrentQueries));
          }
 -        finally
 +
 +        public void close()
          {
 -            long latency = System.nanoTime() - startTime;
 -            rangeMetrics.addNano(latency);
 -            Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
 +            try
 +            {
 +                if (sentQueryIterator != null)
 +                    sentQueryIterator.close();
 +            }
 +            finally
 +            {
 +                long latency = System.nanoTime() - startTime;
 +                rangeMetrics.addNano(latency);
 +                Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
 +            }
          }
 -        return command.postReconciliationProcessing(rows);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
 +    {
 +        Tracing.trace("Computing ranges to query");
 +
 +        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
 +        RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
 +
 +        // our estimate of how many result rows there will be per-range
 +        float resultsPerRange = estimateResultsPerRange(command, keyspace);
 +        // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
 +        // fetch enough rows in the first round
 +        resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
 +        int concurrencyFactor = resultsPerRange == 0.0
 +                              ? 1
 +                              : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
 +        logger.trace("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
 +                     resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
 +        Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
 +
 +        // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
 +
-         return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
++        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)),
++                                       command.nowInSec(),
++                                       command.selectsFullPartition());
      }
  
      public Map<String, List<String>> getSchemaVersions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 74ec47d,02623eb..ffd1b82
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -29,196 -34,367 +29,196 @@@ import org.apache.cassandra.service.Cli
  
  abstract class AbstractQueryPager implements QueryPager
  {
 -    private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class);
 -
 -    private final ConsistencyLevel consistencyLevel;
 -    private final boolean localQuery;
 -
 -    protected final CFMetaData cfm;
 -    protected final IDiskAtomFilter columnFilter;
 -    private final long timestamp;
 +    protected final ReadCommand command;
 +    protected final DataLimits limits;
 +    protected final int protocolVersion;
  
      private int remaining;
 -    private boolean exhausted;
 -    private boolean shouldFetchExtraRow;
 -
 -    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
 -                                 int toFetch,
 -                                 boolean localQuery,
 -                                 String keyspace,
 -                                 String columnFamily,
 -                                 IDiskAtomFilter columnFilter,
 -                                 long timestamp)
 -    {
 -        this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp);
 -    }
  
 -    protected AbstractQueryPager(ConsistencyLevel consistencyLevel,
 -                                 int toFetch,
 -                                 boolean localQuery,
 -                                 CFMetaData cfm,
 -                                 IDiskAtomFilter columnFilter,
 -                                 long timestamp)
 -    {
 -        this.consistencyLevel = consistencyLevel;
 -        this.localQuery = localQuery;
 -
 -        this.cfm = cfm;
 -        this.columnFilter = columnFilter;
 -        this.timestamp = timestamp;
 -
 -        this.remaining = toFetch;
 -    }
 -
 -
 -    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
 -    {
 -        if (isExhausted())
 -            return Collections.emptyList();
 -
 -        int currentPageSize = nextPageSize(pageSize);
 -        List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery));
 -
 -        if (rows.isEmpty())
 -        {
 -            logger.debug("Got empty set of rows, considering pager exhausted");
 -            exhausted = true;
 -            return Collections.emptyList();
 -        }
 -
 -        int liveCount = getPageLiveCount(rows);
 -        logger.debug("Fetched {} live rows", liveCount);
 -
 -        // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may be greater than what asked
 -        // (currentPageSize). This would throw off the paging logic so we trim the excess. It's not extremely efficient
 -        // but most of the time there should be nothing or very little to trim.
 -        if (liveCount > currentPageSize)
 -        {
 -            rows = discardLast(rows, liveCount - currentPageSize);
 -            liveCount = currentPageSize;
 -        }
 -
 -        remaining -= liveCount;
 -
 -        // If we've got less than requested, there is no more query to do (but
 -        // we still need to return the current page)
 -        if (liveCount < currentPageSize)
 -        {
 -            logger.debug("Got result ({}) smaller than page size ({}), considering pager exhausted", liveCount, currentPageSize);
 -            exhausted = true;
 -        }
 +    // This is the last key we've been reading from (or can still be reading within). This the key for
 +    // which remainingInPartition makes sense: if we're starting another key, we should reset remainingInPartition
 +    // (and this is done in PagerIterator). This can be null (when we start).
 +    private DecoratedKey lastKey;
 +    private int remainingInPartition;
  
 -        // If it's not the first query and the first column is the last one returned (likely
 -        // but not certain since paging can race with deletes/expiration), then remove the
 -        // first column.
 -        if (containsPreviousLast(rows.get(0)))
 -        {
 -            rows = discardFirst(rows);
 -            remaining++;
 -        }
 -        // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the page size,
 -        // so if the page is full, trim the last entry
 -        else if (shouldFetchExtraRow && !exhausted)
 -        {
 -            // We've asked for one more than necessary
 -            rows = discardLast(rows);
 -            remaining++;
 -        }
 -
 -        logger.debug("Remaining rows to page: {}", remaining);
 -
 -        if (!isExhausted())
 -            shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1));
 -
 -        return rows;
 -    }
 +    private boolean exhausted;
  
 -    private List<Row> filterEmpty(List<Row> result)
 +    protected AbstractQueryPager(ReadCommand command, int protocolVersion)
      {
 -        for (Row row : result)
 -        {
 -            if (row.cf == null || !row.cf.hasColumns())
 -            {
 -                List<Row> newResult = new ArrayList<Row>(result.size() - 1);
 -                for (Row row2 : result)
 -                {
 -                    if (row2.cf == null || !row2.cf.hasColumns())
 -                        continue;
 +        this.command = command;
 +        this.protocolVersion = protocolVersion;
 +        this.limits = command.limits();
  
 -                    newResult.add(row2);
 -                }
 -                return newResult;
 -            }
 -        }
 -        return result;
 +        this.remaining = limits.count();
 +        this.remainingInPartition = limits.perPartitionCount();
      }
  
 -    protected void restoreState(int remaining, boolean shouldFetchExtraRow)
 +    public ReadOrderGroup startOrderGroup()
      {
 -        this.remaining = remaining;
 -        this.shouldFetchExtraRow = shouldFetchExtraRow;
 +        return command.startOrderGroup();
      }
  
 -    public boolean isExhausted()
 +    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
      {
 -        return exhausted || remaining == 0;
 -    }
 +        if (isExhausted())
 +            return EmptyIterators.partition();
  
 -    public int maxRemaining()
 -    {
 -        return remaining;
 +        pageSize = Math.min(pageSize, remaining);
 +        Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
 +        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
      }
  
 -    public long timestamp()
 +    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
      {
 -        return timestamp;
 -    }
 +        if (isExhausted())
 +            return EmptyIterators.partition();
  
 -    private int nextPageSize(int pageSize)
 -    {
 -        return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0);
 +        pageSize = Math.min(pageSize, remaining);
 +        RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec());
 +        return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup), pager);
      }
  
 -    public ColumnCounter columnCounter()
 +    public UnfilteredPartitionIterator fetchPageUnfiltered(CFMetaData cfm, int pageSize, ReadOrderGroup orderGroup)
      {
 -        return columnFilter.columnCounter(cfm.comparator, timestamp);
 -    }
 -
 -    protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException;
 -
 -    /**
 -     * Checks to see if the first row of a new page contains the last row from the previous page.
 -     * @param first the first row of the new page
 -     * @return true if <code>first</code> contains the last from from the previous page and it is live, false otherwise
 -     */
 -    protected abstract boolean containsPreviousLast(Row first);
 -
 -    /**
 -     * Saves the paging state by recording the last seen partition key and cell name (where applicable).
 -     * @param last the last row in the current page
 -     * @return true if an extra row should be fetched in the next page,false otherwise
 -     */
 -    protected abstract boolean recordLast(Row last);
 +        if (isExhausted())
 +            return EmptyIterators.unfilteredPartition(cfm, false);
  
 -    protected abstract boolean isReversed();
 +        pageSize = Math.min(pageSize, remaining);
 +        UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec());
  
 -    private List<Row> discardFirst(List<Row> rows)
 -    {
 -        return discardFirst(rows, 1);
 +        return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(orderGroup), pager);
      }
  
 -    @VisibleForTesting
 -    List<Row> discardFirst(List<Row> rows, int toDiscard)
 +    private class UnfilteredPager extends Pager<Unfiltered>
      {
 -        if (toDiscard == 0 || rows.isEmpty())
 -            return rows;
  
 -        int i = 0;
 -        DecoratedKey firstKey = null;
 -        ColumnFamily firstCf = null;
 -        while (toDiscard > 0 && i < rows.size())
 +        private UnfilteredPager(DataLimits pageLimits, int nowInSec)
          {
 -            Row first = rows.get(i++);
 -            firstKey = first.key;
 -            firstCf = first.cf.cloneMeShallow(isReversed());
 -            toDiscard -= isReversed()
 -                       ? discardLast(first.cf, toDiscard, firstCf)
 -                       : discardFirst(first.cf, toDiscard, firstCf);
 +            super(pageLimits, nowInSec);
          }
  
 -        // If there is less live data than to discard, all is discarded
 -        if (toDiscard > 0)
 -            return Collections.<Row>emptyList();
 -
 -        // i is the index of the first row that we are sure to keep. On top of that,
 -        // we also keep firstCf is it hasn't been fully emptied by the last iteration above.
 -        int count = firstCf.getColumnCount();
 -        int newSize = rows.size() - (count == 0 ? i : i - 1);
 -        List<Row> newRows = new ArrayList<Row>(newSize);
 -        if (count != 0)
 -            newRows.add(new Row(firstKey, firstCf));
 -        newRows.addAll(rows.subList(i, rows.size()));
 -
 -        return newRows;
 -    }
 -
 -    private List<Row> discardLast(List<Row> rows)
 -    {
 -        return discardLast(rows, 1);
 +        protected BaseRowIterator<Unfiltered> apply(BaseRowIterator<Unfiltered> partition)
 +        {
 +            return Transformation.apply(counter.applyTo((UnfilteredRowIterator) partition), this);
 +        }
      }
  
 -    @VisibleForTesting
 -    List<Row> discardLast(List<Row> rows, int toDiscard)
 +    private class RowPager extends Pager<Row>
      {
 -        if (toDiscard == 0 || rows.isEmpty())
 -            return rows;
  
 -        int i = rows.size()-1;
 -        DecoratedKey lastKey = null;
 -        ColumnFamily lastCf = null;
 -        while (toDiscard > 0 && i >= 0)
 +        private RowPager(DataLimits pageLimits, int nowInSec)
          {
 -            Row last = rows.get(i--);
 -            lastKey = last.key;
 -            lastCf = last.cf.cloneMeShallow(isReversed());
 -            toDiscard -= isReversed()
 -                       ? discardFirst(last.cf, toDiscard, lastCf)
 -                       : discardLast(last.cf, toDiscard, lastCf);
 +            super(pageLimits, nowInSec);
          }
  
 -        // If there is less live data than to discard, all is discarded
 -        if (toDiscard > 0)
 -            return Collections.<Row>emptyList();
 -
 -        // i is the index of the last row that we are sure to keep. On top of that,
 -        // we also keep lastCf is it hasn't been fully emptied by the last iteration above.
 -        int count = lastCf.getColumnCount();
 -        int newSize = count == 0 ? i+1 : i+2;
 -        List<Row> newRows = new ArrayList<Row>(newSize);
 -        newRows.addAll(rows.subList(0, i+1));
 -        if (count != 0)
 -            newRows.add(new Row(lastKey, lastCf));
 -
 -        return newRows;
 +        protected BaseRowIterator<Row> apply(BaseRowIterator<Row> partition)
 +        {
 +            return Transformation.apply(counter.applyTo((RowIterator) partition), this);
 +        }
      }
  
 -    private int getPageLiveCount(List<Row> page)
 +    private abstract class Pager<T extends Unfiltered> extends Transformation<BaseRowIterator<T>>
      {
 -        int count = 0;
 -        for (Row row : page)
 -            count += columnCounter().countAll(row.cf).live();
 -        return count;
 -    }
 +        private final DataLimits pageLimits;
 +        protected final DataLimits.Counter counter;
 +        private Row lastRow;
 +        private boolean isFirstPartition = true;
  
 -    private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
 -    {
 -        boolean isReversed = isReversed();
 -        DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
 -        return isReversed
 -             ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester)
 -             : discardHead(toDiscard, newCf, cf.iterator(), tester);
 -    }
 +        private Pager(DataLimits pageLimits, int nowInSec)
 +        {
-             this.counter = pageLimits.newCounter(nowInSec, true);
++            this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition());
 +            this.pageLimits = pageLimits;
 +        }
  
 -    private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf)
 -    {
 -        boolean isReversed = isReversed();
 -        DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed);
 -        return isReversed
 -             ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester)
 -             : discardTail(cf, toDiscard, newCf, cf.iterator(), tester);
 -    }
 +        @Override
 +        public BaseRowIterator<T> applyToPartition(BaseRowIterator<T> partition)
 +        {
 +            DecoratedKey key = partition.partitionKey();
 +            if (lastKey == null || !lastKey.equals(key))
 +                remainingInPartition = limits.perPartitionCount();
 +            lastKey = key;
 +
 +            // If this is the first partition of this page, this could be the continuation of a partition we've started
 +            // on the previous page. In which case, we could have the problem that the partition has no more "regular"
 +            // rows (but the page size is such we didn't knew before) but it does has a static row. We should then skip
 +            // the partition as returning it would means to the upper layer that the partition has "only" static columns,
 +            // which is not the case (and we know the static results have been sent on the previous page).
 +            if (isFirstPartition)
 +            {
 +                isFirstPartition = false;
 +                if (isPreviouslyReturnedPartition(key) && !partition.hasNext())
 +                {
 +                    partition.close();
 +                    return null;
 +                }
 +            }
  
 -    private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
 -    {
 -        ColumnCounter counter = columnCounter();
 +            return apply(partition);
 +        }
  
 -        List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size());
 +        protected abstract BaseRowIterator<T> apply(BaseRowIterator<T> partition);
  
 -        // Discard the first 'toDiscard' live, non-static cells
 -        while (iter.hasNext())
 +        @Override
 +        public void onClose()
          {
 -            Cell c = iter.next();
 -
 -            // if it's a static column, don't count it and save it to add to the trimmed results
 -            ColumnDefinition columnDef = cfm.getColumnDefinition(c.name());
 -            if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC)
 +            recordLast(lastKey, lastRow);
 +
 +            int counted = counter.counted();
 +            remaining -= counted;
 +            // If the clustering of the last row returned is a static one, it means that the partition was only
 +            // containing data within the static columns. If the clustering of the last row returned is empty
 +            // it means that there is only one row per partition. Therefore, in both cases there are no data remaining
 +            // within the partition.
 +            if (lastRow != null && (lastRow.clustering() == Clustering.STATIC_CLUSTERING
 +                    || lastRow.clustering() == Clustering.EMPTY))
              {
 -                staticCells.add(c);
 -                continue;
 +                remainingInPartition = 0;
              }
 -
 -            counter.count(c, tester);
 -
 -            // once we've discarded the required amount, add the rest
 -            if (counter.live() > toDiscard)
 +            else
              {
 -                for (Cell staticCell : staticCells)
 -                    copy.addColumn(staticCell);
 -
 -                copy.addColumn(c);
 -                while (iter.hasNext())
 -                    copy.addColumn(iter.next());
 +                remainingInPartition -= counter.countedInCurrentPartition();
              }
 +            exhausted = counted < pageLimits.count();
          }
 -        int live = counter.live();
 -        // We want to take into account the row even if it was containing only static columns
 -        if (live == 0 && !staticCells.isEmpty())
 -            live = 1;
 -        return Math.min(live, toDiscard);
 -    }
 -
 -    private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester)
 -    {
 -        // Redoing the counting like that is not extremely efficient.
 -        // This is called only for reversed slices or in the case of a race between
 -        // paging and a deletion (pretty unlikely), so this is probably acceptable.
 -        int liveCount = columnCounter().countAll(cf).live();
  
 -        if (liveCount == toDiscard)
 -            return toDiscard;
 -
 -        ColumnCounter counter = columnCounter();
 -        // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount - toDiscard')
 -        while (iter.hasNext())
 +        public Row applyToStatic(Row row)
          {
 -            Cell c = iter.next();
 -            counter.count(c, tester);
 -            if (counter.live() > liveCount - toDiscard)
 -                break;
 +            if (!row.isEmpty())
 +                lastRow = row;
 +            return row;
 +        }
  
 -            copy.addColumn(c);
 +        @Override
 +        public Row applyToRow(Row row)
 +        {
 +            lastRow = row;
 +            return row;
          }
 -        return Math.min(liveCount, toDiscard);
      }
  
 -    /**
 -     * Returns the first non-static cell in the ColumnFamily.  This is necessary to avoid recording a static column
 -     * as the "last" cell seen in a reversed query.  Because we will always query static columns alongside the normal
 -     * data for a page, they are not a good indicator of where paging should resume.  When we begin the next page, we
 -     * need to start from the last non-static cell.
 -     */
 -    protected Cell firstNonStaticCell(ColumnFamily cf)
 +    protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition)
      {
 -        for (Cell cell : cf)
 -        {
 -            ColumnDefinition def = cfm.getColumnDefinition(cell.name());
 -            if (def == null || def.kind != ColumnDefinition.Kind.STATIC)
 -                return cell;
 -        }
 -        return null;
 +        this.lastKey = lastKey;
 +        this.remaining = remaining;
 +        this.remainingInPartition = remainingInPartition;
      }
  
 -    protected static Cell lastCell(ColumnFamily cf)
 +    public boolean isExhausted()
      {
 -        return cf.getReverseSortedColumns().iterator().next();
 +        return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) && remainingInPartition == 0);
      }
 +
 +    public int maxRemaining()
 +    {
 +        return remaining;
 +    }
 +
 +    protected int remainingInPartition()
 +    {
 +        return remainingInPartition;
 +    }
 +
 +    protected abstract ReadCommand nextPageReadCommand(int pageSize);
 +    protected abstract void recordLast(DecoratedKey key, Row row);
 +    protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key);
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 8caa14d,35d0971..11bbc0e
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@@ -38,24 -36,19 +38,26 @@@ import org.apache.cassandra.service.Cli
   *
   * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not
   * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the
 - * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't
 + * cfs meanPartitionSize to decide if parallelizing some of the command might be worth it while being confident we don't
   * blow out memory.
   */
 -class MultiPartitionPager implements QueryPager
 +public class MultiPartitionPager implements QueryPager
  {
      private final SinglePartitionPager[] pagers;
 -    private final long timestamp;
 +    private final DataLimits limit;
++    private final boolean selectsFullPartitions;
 +
 +    private final int nowInSec;
  
      private int remaining;
      private int current;
  
 -    MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state, int limitForQuery)
 +    public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, int protocolVersion)
      {
 +        this.limit = group.limits();
 +        this.nowInSec = group.nowInSec();
++        this.selectsFullPartitions = group.selectsFullPartition();
 +
          int i = 0;
          // If it's not the beginning (state != null), we need to find where we were and skip previous commands
          // since they are done.
@@@ -70,15 -63,29 +72,16 @@@
              return;
          }
  
 -        pagers = new SinglePartitionPager[commands.size() - i];
 +        pagers = new SinglePartitionPager[group.commands.size() - i];
          // 'i' is on the first non exhausted pager for the previous page (or the first one)
-         pagers[0] = group.commands.get(i).getPager(state, protocolVersion);
 -        pagers[0] = makePager(commands.get(i), consistencyLevel, cState, localQuery, state);
 -        timestamp = commands.get(i).timestamp;
++        SinglePartitionReadCommand command = group.commands.get(i);
++        pagers[0] = command.getPager(state, protocolVersion);
  
          // Following ones haven't been started yet
 -        for (int j = i + 1; j < commands.size(); j++)
 -        {
 -            ReadCommand command = commands.get(j);
 -            if (command.timestamp != timestamp)
 -                throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen.");
 -            pagers[j - i] = makePager(command, consistencyLevel, cState, localQuery, null);
 -        }
 +        for (int j = i + 1; j < group.commands.size(); j++)
 +            pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion);
  
 -        remaining = state == null ? limitForQuery : state.remaining;
 -    }
 -
 -    private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state)
 -    {
 -        return command instanceof SliceFromReadCommand
 -             ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, localQuery, state)
 -             : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, localQuery);
 +        remaining = state == null ? limit.count() : state.remaining;
      }
  
      public PagingState state()
@@@ -106,93 -113,35 +109,93 @@@
          return true;
      }
  
 -    public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException
 +    public ReadOrderGroup startOrderGroup()
      {
 -        List<Row> result = new ArrayList<Row>();
 -
 -        int remainingThisQuery = Math.min(remaining, pageSize);
 -        while (remainingThisQuery > 0 && !isExhausted())
 +        // Note that for all pagers, the only difference is the partition key to which it applies, so in practice we
 +        // can use any of the sub-pager ReadOrderGroup group to protect the whole pager
 +        for (int i = current; i < pagers.length; i++)
          {
 -            // isExhausted has set us on the first non-exhausted pager
 -            List<Row> page = pagers[current].fetchPage(remainingThisQuery);
 -            if (page.isEmpty())
 -                continue;
 -
 -            Row row = page.get(0);
 -            int fetched = pagers[current].columnCounter().countAll(row.cf).live();
 -            remaining -= fetched;
 -            remainingThisQuery -= fetched;
 -            result.add(row);
 +            if (pagers[i] != null)
 +                return pagers[i].startOrderGroup();
          }
 +        throw new AssertionError("Shouldn't be called on an exhausted pager");
 +    }
  
 -        return result;
 +    @SuppressWarnings("resource") // iter closed via countingIter
 +    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
 +    {
 +        int toQuery = Math.min(remaining, pageSize);
 +        PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null);
-         DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
++        DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions);
 +        iter.setCounter(counter);
 +        return counter.applyTo(iter);
      }
  
 -    public int maxRemaining()
 +    @SuppressWarnings("resource") // iter closed via countingIter
 +    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException
      {
 -        return remaining;
 +        int toQuery = Math.min(remaining, pageSize);
 +        PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup);
-         DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true);
++        DataLimits.Counter counter = limit.forPaging(toQuery).newCounter(nowInSec, true, selectsFullPartitions);
 +        iter.setCounter(counter);
 +        return counter.applyTo(iter);
      }
  
 -    public long timestamp()
 +    private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
      {
 -        return timestamp;
 +        private final int pageSize;
 +        private PartitionIterator result;
 +        private DataLimits.Counter counter;
 +
 +        // For "normal" queries
 +        private final ConsistencyLevel consistency;
 +        private final ClientState clientState;
 +
 +        // For internal queries
 +        private final ReadOrderGroup orderGroup;
 +
 +        public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadOrderGroup orderGroup)
 +        {
 +            this.pageSize = pageSize;
 +            this.consistency = consistency;
 +            this.clientState = clientState;
 +            this.orderGroup = orderGroup;
 +        }
 +
 +        public void setCounter(DataLimits.Counter counter)
 +        {
 +            this.counter = counter;
 +        }
 +
 +        protected RowIterator computeNext()
 +        {
 +            while (result == null || !result.hasNext())
 +            {
 +                if (result != null)
 +                    result.close();
 +
 +                // This sets us on the first non-exhausted pager
 +                if (isExhausted())
 +                    return endOfData();
 +
 +                int toQuery = pageSize - counter.counted();
 +                result = consistency == null
 +                       ? pagers[current].fetchPageInternal(toQuery, orderGroup)
 +                       : pagers[current].fetchPage(toQuery, consistency, clientState);
 +            }
 +            return result.next();
 +        }
 +
 +        public void close()
 +        {
 +            remaining -= counter.counted();
 +            if (result != null)
 +                result.close();
 +        }
 +    }
 +
 +    public int maxRemaining()
 +    {
 +        return remaining;
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 02b5de2,f933ccb..c26bf3f
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@@ -36,30 -41,156 +36,30 @@@ public class QueryPager
      /**
       * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath.
       */
 -    public static int countPaged(String keyspace,
 -                                 String columnFamily,
 -                                 ByteBuffer key,
 -                                 SliceQueryFilter filter,
 +    public static int countPaged(CFMetaData metadata,
 +                                 DecoratedKey key,
 +                                 ColumnFilter columnFilter,
 +                                 ClusteringIndexFilter filter,
 +                                 DataLimits limits,
                                   ConsistencyLevel consistencyLevel,
 -                                 ClientState cState,
 +                                 ClientState state,
                                   final int pageSize,
 -                                 long now) throws RequestValidationException, RequestExecutionException
 +                                 int nowInSec,
 +                                 boolean isForThrift) throws RequestValidationException, RequestExecutionException
      {
 -        SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter);
 -        final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, cState, false);
 +        SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
 +        final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION);
  
 -        ColumnCounter counter = filter.columnCounter(Schema.instance.getCFMetaData(keyspace, columnFamily).comparator, now);
 +        int count = 0;
          while (!pager.isExhausted())
          {
 -            List<Row> next = pager.fetchPage(pageSize);
 -            if (!next.isEmpty())
 -                counter.countAll(next.get(0).cf);
 +            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state))
 +            {
-                 DataLimits.Counter counter = limits.newCounter(nowInSec, true);
++                DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition());
 +                PartitionIterators.consume(counter.applyTo(iter));
 +                count += counter.counted();
 +            }
          }
 -        return counter.live();
 +        return count;
      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/88d2ac4f/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index 11d9e19,416a4b2..ba23c67
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -716,17 -549,14 +716,17 @@@ public abstract class CQLTeste
  
      protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable
      {
 -        requireNetwork();
 -
 -        return session[protocolVersion-1].execute(formatQuery(query), values);
 +        return sessionNet(protocolVersion).execute(formatQuery(query), values);
      }
  
-     protected Session sessionNet()
+     protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable
      {
-         return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1));
 -        return sessionNet(maxProtocolVersion).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
++        return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
 +    }
 +
-     protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable
++    protected Session sessionNet()
 +    {
-         return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
++        return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1));
      }
  
      protected Session sessionNet(int protocolVersion)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message