cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [12/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:36 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 032765a..17a9c11 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -41,8 +41,9 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
-import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexSearcher;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -195,13 +196,13 @@ public class StorageProxy implements StorageProxyMBean
      * @return null if the operation succeeds in updating the row, or the current values corresponding to conditions.
      * (since, if the CAS doesn't succeed, it means the current value do not match the conditions).
      */
-    public static ColumnFamily cas(String keyspaceName,
-                                   String cfName,
-                                   ByteBuffer key,
-                                   CASRequest request,
-                                   ConsistencyLevel consistencyForPaxos,
-                                   ConsistencyLevel consistencyForCommit,
-                                   ClientState state)
+    public static RowIterator cas(String keyspaceName,
+                                  String cfName,
+                                  DecoratedKey key,
+                                  CASRequest request,
+                                  ConsistencyLevel consistencyForPaxos,
+                                  ConsistencyLevel consistencyForCommit,
+                                  ClientState state)
     throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
     {
         final long start = System.nanoTime();
@@ -217,32 +218,35 @@ public class StorageProxy implements StorageProxyMBean
             while (System.nanoTime() - start < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the start of each attempt
-                Pair<List<InetAddress>, Integer> p = getPaxosParticipants(keyspaceName, key, consistencyForPaxos);
+                Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
                 List<InetAddress> liveEndpoints = p.left;
                 int requiredParticipants = p.right;
 
                 final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
                 final UUID ballot = pair.left;
                 contentions += pair.right;
+
                 // read the current values and check they validate the conditions
                 Tracing.trace("Reading existing values for CAS precondition");
-                long timestamp = System.currentTimeMillis();
-                ReadCommand readCommand = ReadCommand.create(keyspaceName, key, cfName, timestamp, request.readFilter());
-                List<Row> rows = read(Arrays.asList(readCommand), consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL
-                                                                  ? ConsistencyLevel.LOCAL_QUORUM
-                                                                  : ConsistencyLevel.QUORUM);
-                ColumnFamily current = rows.get(0).cf;
+                SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds());
+                ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
+
+                FilteredPartition current;
+                try (RowIterator rowIter = readOne(readCommand, readConsistency))
+                {
+                    current = FilteredPartition.create(rowIter);
+                }
+
                 if (!request.appliesTo(current))
                 {
                     Tracing.trace("CAS precondition does not match current values {}", current);
-                    // We should not return null as this means success
                     casWriteMetrics.conditionNotMet.inc();
-                    return current == null ? ArrayBackedSortedColumns.factory.create(metadata) : current;
+                    return current.rowIterator();
                 }
 
                 // finish the paxos round w/ the desired updates
                 // TODO turn null updates into delete?
-                ColumnFamily updates = request.makeUpdates(current);
+                PartitionUpdate updates = request.makeUpdates(current);
 
                 // Apply triggers to cas updates. A consideration here is that
                 // triggers emit Mutations, and so a given trigger implementation
@@ -251,9 +255,10 @@ public class StorageProxy implements StorageProxyMBean
                 // validate that the generated mutations are targetted at the same
                 // partition as the initial updates and reject (via an
                 // InvalidRequestException) any which aren't.
-                updates = TriggerExecutor.instance.execute(key, updates);
+                updates = TriggerExecutor.instance.execute(updates);
+
 
-                Commit proposal = Commit.newProposal(key, ballot, updates);
+                Commit proposal = Commit.newProposal(ballot, updates);
                 Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
                 if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
                 {
@@ -305,12 +310,11 @@ public class StorageProxy implements StorageProxyMBean
         };
     }
 
-    private static Pair<List<InetAddress>, Integer> getPaxosParticipants(String keyspaceName, ByteBuffer key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+    private static Pair<List<InetAddress>, Integer> getPaxosParticipants(CFMetaData cfm, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
     {
-        Token tk = StorageService.getPartitioner().getToken(key);
-        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
-        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
-
+        Token tk = key.getToken();
+        List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(cfm.ksName, tk);
+        Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, cfm.ksName);
         if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
         {
             // Restrict naturalEndpoints and pendingEndpoints to node in the local DC only
@@ -344,7 +348,7 @@ public class StorageProxy implements StorageProxyMBean
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
     private static Pair<UUID, Integer> beginAndRepairPaxos(long start,
-                                                           ByteBuffer key,
+                                                           DecoratedKey key,
                                                            CFMetaData metadata,
                                                            List<InetAddress> liveEndpoints,
                                                            int requiredParticipants,
@@ -393,7 +397,7 @@ public class StorageProxy implements StorageProxyMBean
                     casWriteMetrics.unfinishedCommit.inc();
                 else
                     casReadMetrics.unfinishedCommit.inc();
-                Commit refreshedInProgress = Commit.newProposal(inProgress.key, ballot, inProgress.update);
+                Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
                 if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos))
                 {
                     try
@@ -451,7 +455,7 @@ public class StorageProxy implements StorageProxyMBean
     private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new PrepareCallback(toPrepare.key, toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos);
+        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
@@ -483,7 +487,7 @@ public class StorageProxy implements StorageProxyMBean
         boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
 
-        Token tk = StorageService.getPartitioner().getToken(proposal.key);
+        Token tk = proposal.update.partitionKey().getToken();
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName());
 
@@ -604,7 +608,7 @@ public class StorageProxy implements StorageProxyMBean
             if (mutation instanceof CounterMutation)
                 continue;
 
-            Token tk = StorageService.getPartitioner().getToken(mutation.key());
+            Token tk = mutation.key().getToken();
             List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
             Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
             for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
@@ -699,6 +703,12 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    public static boolean canDoLocalRequest(InetAddress replica)
+    {
+        return replica.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS;
+    }
+
+
     private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
     throws WriteTimeoutException, WriteFailureException
     {
@@ -714,7 +724,7 @@ public class StorageProxy implements StorageProxyMBean
         for (InetAddress target : endpoints)
         {
             int targetVersion = MessagingService.instance().getVersion(target);
-            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+            if (canDoLocalRequest(target))
             {
                 insertLocal(message.payload, handler);
             }
@@ -743,12 +753,12 @@ public class StorageProxy implements StorageProxyMBean
                                                                         Keyspace.open(SystemKeyspace.NAME),
                                                                         null,
                                                                         WriteType.SIMPLE);
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, UUIDType.instance.decompose(uuid));
-        mutation.delete(SystemKeyspace.BATCHLOG, FBUtilities.timestampMicros());
+        Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid)));
+        mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
         MessageOut<Mutation> message = mutation.createMessage();
         for (InetAddress target : endpoints)
         {
-            if (target.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+            if (canDoLocalRequest(target))
                 insertLocal(message.payload, handler);
             else
                 MessagingService.instance().sendRR(message, target, handler, false);
@@ -793,7 +803,7 @@ public class StorageProxy implements StorageProxyMBean
         String keyspaceName = mutation.getKeyspaceName();
         AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
 
-        Token tk = StorageService.getPartitioner().getToken(mutation.key());
+        Token tk = mutation.key().getToken();
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
@@ -811,7 +821,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
         String keyspaceName = mutation.getKeyspaceName();
-        Token tk = StorageService.getPartitioner().getToken(mutation.key());
+        Token tk = mutation.key().getToken();
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
         AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
@@ -903,7 +913,7 @@ public class StorageProxy implements StorageProxyMBean
 
             if (FailureDetector.instance.isAlive(destination))
             {
-                if (destination.equals(FBUtilities.getBroadcastAddress()) && OPTIMIZE_LOCAL_REQUESTS)
+                if (canDoLocalRequest(destination))
                 {
                     insertLocal = true;
                 } else
@@ -1064,7 +1074,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 catch (Exception ex)
                 {
-                    logger.error("Failed to apply mutation locally : {}", ex.getMessage());
+                    logger.error("Failed to apply mutation locally : {}", ex);
                     responseHandler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
@@ -1098,7 +1108,7 @@ public class StorageProxy implements StorageProxyMBean
             // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
             String keyspaceName = cm.getKeyspaceName();
             AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
-            Token tk = StorageService.getPartitioner().getToken(cm.key());
+            Token tk = cm.key().getToken();
             List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
             Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
@@ -1123,7 +1133,7 @@ public class StorageProxy implements StorageProxyMBean
      * is unclear we want to mix those latencies with read latencies, so this
      * may be a bit involved.
      */
-    private static InetAddress findSuitableEndpoint(String keyspaceName, ByteBuffer key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
+    private static InetAddress findSuitableEndpoint(String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -1189,57 +1199,69 @@ public class StorageProxy implements StorageProxyMBean
         };
     }
 
-    private static boolean systemKeyspaceQuery(List<ReadCommand> cmds)
+    private static boolean systemKeyspaceQuery(List<? extends ReadCommand> cmds)
     {
         for (ReadCommand cmd : cmds)
-            if (!cmd.ksName.equals(SystemKeyspace.NAME))
+            if (!cmd.metadata().ksName.equals(SystemKeyspace.NAME))
                 return false;
         return true;
     }
 
-    public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
+    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel)
+    throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
+    {
+        return readOne(command, consistencyLevel, null);
+    }
+
+    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state)
+    throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
+    {
+        return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state), command);
+    }
+
+    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
         // When using serial CL, the ClientState should be provided
         assert !consistencyLevel.isSerialConsistency();
-        return read(commands, consistencyLevel, null);
+        return read(group, consistencyLevel, null);
     }
 
     /**
      * Performs the actual reading of a row out of the StorageService, fetching
      * a specific set of column names from a given column family.
      */
-    public static List<Row> read(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
+    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
-        if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(commands))
+        if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands))
         {
             readMetrics.unavailables.mark();
             throw new IsBootstrappingException();
         }
 
         return consistencyLevel.isSerialConsistency()
-             ? readWithPaxos(commands, consistencyLevel, state)
-             : readRegular(commands, consistencyLevel);
+             ? readWithPaxos(group, consistencyLevel, state)
+             : readRegular(group, consistencyLevel);
     }
 
-    private static List<Row> readWithPaxos(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState state)
+    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
     throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
         assert state != null;
+        if (group.commands.size() > 1)
+            throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time");
 
         long start = System.nanoTime();
-        List<Row> rows = null;
+        SinglePartitionReadCommand command = group.commands.get(0);
+        CFMetaData metadata = command.metadata();
+        DecoratedKey key = command.partitionKey();
 
+        PartitionIterator result = null;
         try
         {
             // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read
-            if (commands.size() > 1)
-                throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one row at a time");
-            ReadCommand command = commands.get(0);
-
-            CFMetaData metadata = Schema.instance.getCFMetaData(command.ksName, command.cfName);
-            Pair<List<InetAddress>, Integer> p = getPaxosParticipants(command.ksName, command.key, consistencyLevel);
+            Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyLevel);
             List<InetAddress> liveEndpoints = p.left;
             int requiredParticipants = p.right;
 
@@ -1247,22 +1269,23 @@ public class StorageProxy implements StorageProxyMBean
             final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
                                                                                    ? ConsistencyLevel.LOCAL_QUORUM
                                                                                    : ConsistencyLevel.QUORUM;
+
             try
             {
-                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, command.key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state);
+                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyLevel, consistencyForCommitOrFetch, false, state);
                 if (pair.right > 0)
                     casReadMetrics.contention.update(pair.right);
             }
             catch (WriteTimeoutException e)
             {
-                throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(command.ksName)), false);
+                throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(metadata.ksName)), false);
             }
             catch (WriteFailureException e)
             {
                 throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false);
             }
 
-            rows = fetchRows(commands, consistencyForCommitOrFetch);
+            result = fetchRows(group.commands, consistencyForCommitOrFetch);
         }
         catch (UnavailableException e)
         {
@@ -1287,23 +1310,25 @@ public class StorageProxy implements StorageProxyMBean
             long latency = System.nanoTime() - start;
             readMetrics.addNano(latency);
             casReadMetrics.addNano(latency);
-            // TODO avoid giving every command the same latency number.  Can fix this in CASSADRA-5329
-            for (ReadCommand command : commands)
-                Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
+            Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
         }
 
-        return rows;
+        return result;
     }
 
-    private static List<Row> readRegular(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
+    @SuppressWarnings("resource")
+    private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         long start = System.nanoTime();
-        List<Row> rows = null;
-
         try
         {
-            rows = fetchRows(commands, consistencyLevel);
+            PartitionIterator result = fetchRows(group.commands, consistencyLevel);
+            // If we have more than one command, then despite each read command honoring the limit, the total result
+            // might not honor it and so we should enforce it
+            if (group.commands.size() > 1)
+                result = group.limits().filter(result, group.nowInSec());
+            return result;
         }
         catch (UnavailableException e)
         {
@@ -1325,11 +1350,9 @@ public class StorageProxy implements StorageProxyMBean
             long latency = System.nanoTime() - start;
             readMetrics.addNano(latency);
             // TODO avoid giving every command the same latency number.  Can fix this in CASSADRA-5329
-            for (ReadCommand command : commands)
-                Keyspace.open(command.ksName).getColumnFamilyStore(command.cfName).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
+            for (ReadCommand command : group.commands)
+                Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS);
         }
-
-        return rows;
     }
 
     /**
@@ -1343,218 +1366,144 @@ public class StorageProxy implements StorageProxyMBean
      * 4. If the digests (if any) match the data return the data
      * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static List<Row> fetchRows(List<ReadCommand> initialCommands, ConsistencyLevel consistencyLevel)
+    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand<?>> commands, ConsistencyLevel consistencyLevel)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
-        List<Row> rows = new ArrayList<>(initialCommands.size());
-        // (avoid allocating a new list in the common case of nothing-to-retry)
-        List<ReadCommand> commandsToRetry = Collections.emptyList();
+        int cmdCount = commands.size();
 
-        do
-        {
-            List<ReadCommand> commands = commandsToRetry.isEmpty() ? initialCommands : commandsToRetry;
-            AbstractReadExecutor[] readExecutors = new AbstractReadExecutor[commands.size()];
+        SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
+        for (int i = 0; i < cmdCount; i++)
+            reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel);
 
-            if (!commandsToRetry.isEmpty())
-                Tracing.trace("Retrying {} commands", commandsToRetry.size());
+        for (int i = 0; i < cmdCount; i++)
+            reads[i].doInitialQueries();
 
-            // send out read requests
-            for (int i = 0; i < commands.size(); i++)
-            {
-                ReadCommand command = commands.get(i);
-                assert !command.isDigestQuery();
+        for (int i = 0; i < cmdCount; i++)
+            reads[i].maybeTryAdditionalReplicas();
 
-                AbstractReadExecutor exec = AbstractReadExecutor.getReadExecutor(command, consistencyLevel);
-                exec.executeAsync();
-                readExecutors[i] = exec;
-            }
+        for (int i = 0; i < cmdCount; i++)
+            reads[i].awaitResultsAndRetryOnDigestMismatch();
 
-            for (AbstractReadExecutor exec : readExecutors)
-                exec.maybeTryAdditionalReplicas();
+        for (int i = 0; i < cmdCount; i++)
+            if (!reads[i].isDone())
+                reads[i].maybeAwaitFullDataRead();
 
-            // read results and make a second pass for any digest mismatches
-            List<ReadCommand> repairCommands = null;
-            List<ReadCallback<ReadResponse, Row>> repairResponseHandlers = null;
-            for (AbstractReadExecutor exec: readExecutors)
-            {
-                try
-                {
-                    Row row = exec.get();
-                    if (row != null)
-                    {
-                        exec.command.maybeTrim(row);
-                        rows.add(row);
-                    }
+        List<PartitionIterator> results = new ArrayList<>(cmdCount);
+        for (int i = 0; i < cmdCount; i++)
+        {
+            assert reads[i].isDone();
+            results.add(reads[i].getResult());
+        }
 
-                    if (logger.isDebugEnabled())
-                        logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - exec.handler.start));
-                }
-                catch (ReadTimeoutException|ReadFailureException ex)
-                {
-                    int blockFor = consistencyLevel.blockFor(Keyspace.open(exec.command.getKeyspace()));
-                    int responseCount = exec.handler.getReceivedCount();
-                    String gotData = responseCount > 0
-                                   ? exec.resolver.isDataPresent() ? " (including data)" : " (only digests)"
-                                   : "";
-
-                    boolean isTimeout = ex instanceof ReadTimeoutException;
-                    if (Tracing.isTracing())
-                    {
-                        Tracing.trace("{}; received {} of {} responses{}",
-                                      isTimeout ? "Timed out" : "Failed", responseCount, blockFor, gotData);
-                    }
-                    else if (logger.isDebugEnabled())
-                    {
-                        logger.debug("Read {}; received {} of {} responses{}", (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData);
-                    }
-                    throw ex;
-                }
-                catch (DigestMismatchException ex)
-                {
-                    Tracing.trace("Digest mismatch: {}", ex);
+        return PartitionIterators.concat(results);
+    }
 
-                    ReadRepairMetrics.repairedBlocking.mark();
+    private static class SinglePartitionReadLifecycle
+    {
+        private final SinglePartitionReadCommand<?> command;
+        private final AbstractReadExecutor executor;
+        private final ConsistencyLevel consistency;
 
-                    // Do a full data read to resolve the correct response (and repair node that need be)
-                    RowDataResolver resolver = new RowDataResolver(exec.command.ksName, exec.command.key, exec.command.filter(), exec.command.timestamp, exec.handler.endpoints.size());
-                    ReadCallback<ReadResponse, Row> repairHandler = new ReadCallback<>(resolver,
-                                                                                       ConsistencyLevel.ALL,
-                                                                                       exec.getContactedReplicas().size(),
-                                                                                       exec.command,
-                                                                                       Keyspace.open(exec.command.getKeyspace()),
-                                                                                       exec.handler.endpoints);
+        private PartitionIterator result;
+        private ReadCallback repairHandler;
 
-                    if (repairCommands == null)
-                    {
-                        repairCommands = new ArrayList<>();
-                        repairResponseHandlers = new ArrayList<>();
-                    }
-                    repairCommands.add(exec.command);
-                    repairResponseHandlers.add(repairHandler);
+        SinglePartitionReadLifecycle(SinglePartitionReadCommand<?> command, ConsistencyLevel consistency)
+        {
+            this.command = command;
+            this.executor = AbstractReadExecutor.getReadExecutor(command, consistency);
+            this.consistency = consistency;
+        }
 
-                    MessageOut<ReadCommand> message = exec.command.createMessage();
-                    for (InetAddress endpoint : exec.getContactedReplicas())
-                    {
-                        Tracing.trace("Enqueuing full data read to {}", endpoint);
-                        MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
-                    }
-                }
-            }
+        boolean isDone()
+        {
+            return result != null;
+        }
 
-            commandsToRetry.clear();
+        void doInitialQueries()
+        {
+            executor.executeAsync();
+        }
 
-            // read the results for the digest mismatch retries
-            if (repairResponseHandlers != null)
+        void maybeTryAdditionalReplicas()
+        {
+            executor.maybeTryAdditionalReplicas();
+        }
+
+        void awaitResultsAndRetryOnDigestMismatch() throws ReadFailureException, ReadTimeoutException
+        {
+            try
+            {
+                result = executor.get();
+            }
+            catch (DigestMismatchException ex)
             {
-                for (int i = 0; i < repairCommands.size(); i++)
+                Tracing.trace("Digest mismatch: {}", ex);
+
+                ReadRepairMetrics.repairedBlocking.mark();
+
+                // Do a full data read to resolve the correct response (and repair node that need be)
+                Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+                DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size());
+                repairHandler = new ReadCallback(resolver,
+                                                 ConsistencyLevel.ALL,
+                                                 executor.getContactedReplicas().size(),
+                                                 command,
+                                                 keyspace,
+                                                 executor.handler.endpoints);
+
+                MessageOut<ReadCommand> message = command.createMessage();
+                for (InetAddress endpoint : executor.getContactedReplicas())
                 {
-                    ReadCommand command = repairCommands.get(i);
-                    ReadCallback<ReadResponse, Row> handler = repairResponseHandlers.get(i);
-
-                    Row row;
-                    try
-                    {
-                        row = handler.get();
-                    }
-                    catch (DigestMismatchException e)
-                    {
-                        throw new AssertionError(e); // full data requested from each node here, no digests should be sent
-                    }
-                    catch (ReadTimeoutException e)
-                    {
-                        if (Tracing.isTracing())
-                            Tracing.trace("Timed out waiting on digest mismatch repair requests");
-                        else
-                            logger.debug("Timed out waiting on digest mismatch repair requests");
-                        // the caught exception here will have CL.ALL from the repair command,
-                        // not whatever CL the initial command was at (CASSANDRA-7947)
-                        int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
-                        throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
-                    }
-
-                    RowDataResolver resolver = (RowDataResolver)handler.resolver;
-                    try
-                    {
-                        // wait for the repair writes to be acknowledged, to minimize impact on any replica that's
-                        // behind on writes in case the out-of-sync row is read multiple times in quick succession
-                        FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
-                    }
-                    catch (TimeoutException e)
-                    {
-                        if (Tracing.isTracing())
-                            Tracing.trace("Timed out waiting on digest mismatch repair acknowledgements");
-                        else
-                            logger.debug("Timed out waiting on digest mismatch repair acknowledgements");
-                        int blockFor = consistencyLevel.blockFor(Keyspace.open(command.getKeyspace()));
-                        throw new ReadTimeoutException(consistencyLevel, blockFor-1, blockFor, true);
-                    }
-
-                    // retry any potential short reads
-                    ReadCommand retryCommand = command.maybeGenerateRetryCommand(resolver, row);
-                    if (retryCommand != null)
-                    {
-                        Tracing.trace("Issuing retry for read command");
-                        if (commandsToRetry == Collections.EMPTY_LIST)
-                            commandsToRetry = new ArrayList<>();
-                        commandsToRetry.add(retryCommand);
-                        continue;
-                    }
-
-                    if (row != null)
-                    {
-                        command.maybeTrim(row);
-                        rows.add(row);
-                    }
+                    Tracing.trace("Enqueuing full data read to {}", endpoint);
+                    MessagingService.instance().sendRRWithFailure(message, endpoint, repairHandler);
                 }
             }
-        } while (!commandsToRetry.isEmpty());
-
-        return rows;
-    }
-
-    static class LocalReadRunnable extends DroppableRunnable
-    {
-        private final ReadCommand command;
-        private final ReadCallback<ReadResponse, Row> handler;
-        private final long start = System.nanoTime();
-
-        LocalReadRunnable(ReadCommand command, ReadCallback<ReadResponse, Row> handler)
-        {
-            super(MessagingService.Verb.READ);
-            this.command = command;
-            this.handler = handler;
         }
 
-        protected void runMayThrow()
+        void maybeAwaitFullDataRead() throws ReadTimeoutException
         {
+            // There wasn't a digest mismatch, we're good
+            if (repairHandler == null)
+                return;
+
+            // Otherwise, get the result from the full-data read and check that it's not a short read
             try
             {
-                Keyspace keyspace = Keyspace.open(command.ksName);
-                Row r = command.getRow(keyspace);
-                ReadResponse result = ReadVerbHandler.getResponse(command, r);
-                MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-                handler.response(result);
+                result = repairHandler.get();
             }
-            catch (Throwable t)
+            catch (DigestMismatchException e)
             {
-                handler.onFailure(FBUtilities.getBroadcastAddress());
-                if (t instanceof TombstoneOverwhelmingException)
-                    logger.error(t.getMessage());
+                throw new AssertionError(e); // full data requested from each node here, no digests should be sent
+            }
+            catch (ReadTimeoutException e)
+            {
+                if (Tracing.isTracing())
+                    Tracing.trace("Timed out waiting on digest mismatch repair requests");
                 else
-                    throw t;
+                    logger.debug("Timed out waiting on digest mismatch repair requests");
+                // the caught exception here will have CL.ALL from the repair command,
+                // not whatever CL the initial command was at (CASSANDRA-7947)
+                int blockFor = consistency.blockFor(Keyspace.open(command.metadata().ksName));
+                throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
             }
         }
+
+        PartitionIterator getResult()
+        {
+            assert result != null;
+            return result;
+        }
     }
 
-    static class LocalRangeSliceRunnable extends DroppableRunnable
+    static class LocalReadRunnable extends DroppableRunnable
     {
-        private final AbstractRangeCommand command;
-        private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
+        private final ReadCommand command;
+        private final ReadCallback handler;
         private final long start = System.nanoTime();
 
-        LocalRangeSliceRunnable(AbstractRangeCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+        LocalReadRunnable(ReadCommand command, ReadCallback handler)
         {
-            super(MessagingService.Verb.RANGE_SLICE);
+            super(MessagingService.Verb.READ);
             this.command = command;
             this.handler = handler;
         }
@@ -1563,9 +1512,11 @@ public class StorageProxy implements StorageProxyMBean
         {
             try
             {
-                RangeSliceReply result = new RangeSliceReply(command.executeLocally());
+                try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
+                {
+                    handler.response(command.createResponse(iterator));
+                }
                 MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-                handler.response(result);
             }
             catch (Throwable t)
             {
@@ -1583,7 +1534,7 @@ public class StorageProxy implements StorageProxyMBean
         return getLiveSortedEndpoints(keyspace, StorageService.getPartitioner().decorateKey(key));
     }
 
-    private static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos)
+    public static List<InetAddress> getLiveSortedEndpoints(Keyspace keyspace, RingPosition pos)
     {
         List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(keyspace, pos);
         DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
@@ -1603,307 +1554,322 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
-     * Estimate the number of result rows (either cql3 rows or storage rows, as called for by the command) per
+     * Estimate the number of result rows (either cql3 rows or "thrift" rows, as called for by the command) per
      * range in the ring based on our local data.  This assumes that ranges are uniformly distributed across the cluster
      * and that the queried data is also uniformly distributed.
      */
-    private static float estimateResultRowsPerRange(AbstractRangeCommand command, Keyspace keyspace)
+    private static float estimateResultsPerRange(PartitionRangeReadCommand command, Keyspace keyspace)
     {
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.columnFamily);
-        float resultRowsPerRange = Float.POSITIVE_INFINITY;
-        if (command.rowFilter != null && !command.rowFilter.isEmpty())
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
+        SecondaryIndexSearcher searcher = cfs.indexManager.getBestIndexSearcherFor(command);
+
+        float maxExpectedResults = searcher == null
+                                 ? command.limits().estimateTotalResults(cfs)
+                                 : searcher.highestSelectivityIndex(command.rowFilter()).estimateResultRows();
+
+        // adjust maxExpectedResults by the number of tokens this node has and the replication factor for this ks
+        return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
+    }
+
+    private static class RangeForQuery
+    {
+        public final AbstractBounds<PartitionPosition> range;
+        public final List<InetAddress> liveEndpoints;
+        public final List<InetAddress> filteredEndpoints;
+
+        public RangeForQuery(AbstractBounds<PartitionPosition> range, List<InetAddress> liveEndpoints, List<InetAddress> filteredEndpoints)
         {
-            List<SecondaryIndexSearcher> searchers = cfs.indexManager.getIndexSearchersForQuery(command.rowFilter);
-            if (searchers.isEmpty())
-            {
-                resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs);
-            }
-            else
-            {
-                // Secondary index query (cql3 or otherwise).  Estimate result rows based on most selective 2ary index.
-                for (SecondaryIndexSearcher searcher : searchers)
-                {
-                    // use our own mean column count as our estimate for how many matching rows each node will have
-                    SecondaryIndex highestSelectivityIndex = searcher.highestSelectivityIndex(command.rowFilter);
-                    resultRowsPerRange = Math.min(resultRowsPerRange, highestSelectivityIndex.estimateResultRows());
-                }
-            }
+            this.range = range;
+            this.liveEndpoints = liveEndpoints;
+            this.filteredEndpoints = filteredEndpoints;
         }
-        else if (!command.countCQL3Rows())
+    }
+
+    private static class RangeIterator extends AbstractIterator<RangeForQuery>
+    {
+        private final Keyspace keyspace;
+        private final ConsistencyLevel consistency;
+        private final Iterator<? extends AbstractBounds<PartitionPosition>> ranges;
+        private final int rangeCount;
+
+        public RangeIterator(PartitionRangeReadCommand command, Keyspace keyspace, ConsistencyLevel consistency)
         {
-            // non-cql3 query
-            resultRowsPerRange = cfs.estimateKeys();
+            this.keyspace = keyspace;
+            this.consistency = consistency;
+
+            List<? extends AbstractBounds<PartitionPosition>> l = keyspace.getReplicationStrategy() instanceof LocalStrategy
+                                                          ? command.dataRange().keyRange().unwrap()
+                                                          : getRestrictedRanges(command.dataRange().keyRange());
+            this.ranges = l.iterator();
+            this.rangeCount = l.size();
         }
-        else
+
+        public int rangeCount()
         {
-            resultRowsPerRange = calculateResultRowsUsingEstimatedKeys(cfs);
+            return rangeCount;
         }
 
-        // adjust resultRowsPerRange by the number of tokens this node has and the replication factor for this ks
-        return (resultRowsPerRange / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor();
+        protected RangeForQuery computeNext()
+        {
+            if (!ranges.hasNext())
+                return endOfData();
+
+            AbstractBounds<PartitionPosition> range = ranges.next();
+            List<InetAddress> liveEndpoints = getLiveSortedEndpoints(keyspace, range.right);
+            return new RangeForQuery(range,
+                                     liveEndpoints,
+                                     consistency.filterForQuery(keyspace, liveEndpoints));
+        }
     }
 
-    private static float calculateResultRowsUsingEstimatedKeys(ColumnFamilyStore cfs)
+    private static class RangeMerger extends AbstractIterator<RangeForQuery>
     {
-        if (cfs.metadata.comparator.isDense())
+        private final Keyspace keyspace;
+        private final ConsistencyLevel consistency;
+        private final PeekingIterator<RangeForQuery> ranges;
+
+        private RangeMerger(Iterator<RangeForQuery> iterator, Keyspace keyspace, ConsistencyLevel consistency)
         {
-            // one storage row per result row, so use key estimate directly
-            return cfs.estimateKeys();
+            this.keyspace = keyspace;
+            this.consistency = consistency;
+            this.ranges = Iterators.peekingIterator(iterator);
         }
-        else
+
+        protected RangeForQuery computeNext()
         {
-            float resultRowsPerStorageRow = ((float) cfs.getMeanColumns()) / cfs.metadata.regularColumns().size();
-            return resultRowsPerStorageRow * (cfs.estimateKeys());
+            if (!ranges.hasNext())
+                return endOfData();
+
+            RangeForQuery current = ranges.next();
+
+            // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
+            // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
+            // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
+            while (ranges.hasNext())
+            {
+                // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
+                // don't know how to deal with a wrapping range.
+                // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
+                // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
+                // wire compatibility, so It's likely easier not to bother;
+                if (current.range.right.isMinimum())
+                    break;
+
+                RangeForQuery next = ranges.peek();
+
+                List<InetAddress> merged = intersection(current.liveEndpoints, next.liveEndpoints);
+
+                // Check if there is enough endpoint for the merge to be possible.
+                if (!consistency.isSufficientLiveNodes(keyspace, merged))
+                    break;
+
+                List<InetAddress> filteredMerged = consistency.filterForQuery(keyspace, merged);
+
+                // Estimate whether merging will be a win or not
+                if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.filteredEndpoints, next.filteredEndpoints))
+                    break;
+
+                // If we get there, merge this range and the next one
+                current = new RangeForQuery(current.range.withNewRight(next.range.right), merged, filteredMerged);
+                ranges.next(); // consume the range we just merged since we've only peeked so far
+            }
+            return current;
         }
     }
 
-    public static List<Row> getRangeSlice(AbstractRangeCommand command, ConsistencyLevel consistency_level)
-    throws UnavailableException, ReadFailureException, ReadTimeoutException
+    private static class SingleRangeResponse extends AbstractIterator<RowIterator> implements PartitionIterator
     {
-        Tracing.trace("Computing ranges to query");
-        long startTime = System.nanoTime();
+        private final ReadCallback handler;
+        private PartitionIterator result;
 
-        Keyspace keyspace = Keyspace.open(command.keyspace);
-        List<Row> rows;
-        // now scan until we have enough results
-        try
+        private SingleRangeResponse(ReadCallback handler)
         {
-            int liveRowCount = 0;
-            boolean countLiveRows = command.countCQL3Rows() || command.ignoredTombstonedPartitions();
-            rows = new ArrayList<>();
+            this.handler = handler;
+        }
 
-            // when dealing with LocalStrategy keyspaces, we can skip the range splitting and merging (which can be
-            // expensive in clusters with vnodes)
-            List<? extends AbstractBounds<RowPosition>> ranges;
-            if (keyspace.getReplicationStrategy() instanceof LocalStrategy)
-                ranges = command.keyRange.unwrap();
-            else
-                ranges = getRestrictedRanges(command.keyRange);
+        private void waitForResponse() throws ReadTimeoutException
+        {
+            if (result != null)
+                return;
 
-            // determine the number of rows to be fetched and the concurrency factor
-            int rowsToBeFetched = command.limit();
-            int concurrencyFactor;
-            if (command.requiresScanningAllRanges())
+            try
             {
-                // all nodes must be queried
-                rowsToBeFetched *= ranges.size();
-                concurrencyFactor = ranges.size();
-                logger.debug("Requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
-                             command.limit(),
-                             ranges.size(),
-                             concurrencyFactor);
-                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {}",
-                              ranges.size(), concurrencyFactor);
+                result = handler.get();
             }
-            else
+            catch (DigestMismatchException e)
             {
-                // our estimate of how many result rows there will be per-range
-                float resultRowsPerRange = estimateResultRowsPerRange(command, keyspace);
-                // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
-                // fetch enough rows in the first round
-                resultRowsPerRange -= resultRowsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
-                concurrencyFactor = resultRowsPerRange == 0.0
-                                  ? 1
-                                  : Math.max(1, Math.min(ranges.size(), (int) Math.ceil(command.limit() / resultRowsPerRange)));
-
-                logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
-                             resultRowsPerRange,
-                             command.limit(),
-                             ranges.size(),
-                             concurrencyFactor);
-                Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)",
-                              ranges.size(),
-                              concurrencyFactor,
-                              resultRowsPerRange);
+                throw new AssertionError(e); // no digests in range slices yet
             }
+        }
+
+        protected RowIterator computeNext()
+        {
+            waitForResponse();
+            return result.hasNext() ? result.next() : endOfData();
+        }
+
+        public void close()
+        {
+            if (result != null)
+                result.close();
+        }
+    }
+
+    private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
+    {
+        private final Iterator<RangeForQuery> ranges;
+        private final int totalRangeCount;
+        private final PartitionRangeReadCommand command;
+        private final Keyspace keyspace;
+        private final ConsistencyLevel consistency;
 
-            boolean haveSufficientRows = false;
-            int i = 0;
-            AbstractBounds<RowPosition> nextRange = null;
-            List<InetAddress> nextEndpoints = null;
-            List<InetAddress> nextFilteredEndpoints = null;
-            while (i < ranges.size())
+        private final long startTime;
+        private CountingPartitionIterator sentQueryIterator;
+
+        private int concurrencyFactor;
+        // The two following "metric" are maintained to improve the concurrencyFactor
+        // when it was not good enough initially.
+        private int liveReturned;
+        private int rangesQueried;
+
+        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+        {
+            this.command = command;
+            this.concurrencyFactor = concurrencyFactor;
+            this.startTime = System.nanoTime();
+            this.ranges = new RangeMerger(ranges, keyspace, consistency);
+            this.totalRangeCount = ranges.rangeCount();
+            this.consistency = consistency;
+            this.keyspace = keyspace;
+        }
+
+        public RowIterator computeNext()
+        {
+            while (sentQueryIterator == null || !sentQueryIterator.hasNext())
             {
-                List<Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>>> scanHandlers = new ArrayList<>(concurrencyFactor);
-                int concurrentFetchStartingIndex = i;
-                int concurrentRequests = 0;
-                while ((i - concurrentFetchStartingIndex) < concurrencyFactor)
+                // If we don't have more range to handle, we're done
+                if (!ranges.hasNext())
+                    return endOfData();
+
+                // else, sends the next batch of concurrent queries (after having close the previous iterator)
+                if (sentQueryIterator != null)
                 {
-                    AbstractBounds<RowPosition> range = nextRange == null
-                                                      ? ranges.get(i)
-                                                      : nextRange;
-                    List<InetAddress> liveEndpoints = nextEndpoints == null
-                                                    ? getLiveSortedEndpoints(keyspace, range.right)
-                                                    : nextEndpoints;
-                    List<InetAddress> filteredEndpoints = nextFilteredEndpoints == null
-                                                        ? consistency_level.filterForQuery(keyspace, liveEndpoints)
-                                                        : nextFilteredEndpoints;
-                    ++i;
-                    ++concurrentRequests;
-
-                    // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
-                    // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
-                    // still meets the CL requirements, then we can merge both ranges into the same RangeSliceCommand.
-                    while (i < ranges.size())
-                    {
-                        nextRange = ranges.get(i);
-                        nextEndpoints = getLiveSortedEndpoints(keyspace, nextRange.right);
-                        nextFilteredEndpoints = consistency_level.filterForQuery(keyspace, nextEndpoints);
-
-                        // If the current range right is the min token, we should stop merging because CFS.getRangeSlice
-                        // don't know how to deal with a wrapping range.
-                        // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
-                        // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
-                        // wire compatibility, so It's likely easier not to bother;
-                        if (range.right.isMinimum())
-                            break;
-
-                        List<InetAddress> merged = intersection(liveEndpoints, nextEndpoints);
-
-                        // Check if there is enough endpoint for the merge to be possible.
-                        if (!consistency_level.isSufficientLiveNodes(keyspace, merged))
-                            break;
-
-                        List<InetAddress> filteredMerged = consistency_level.filterForQuery(keyspace, merged);
-
-                        // Estimate whether merging will be a win or not
-                        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, filteredEndpoints, nextFilteredEndpoints))
-                            break;
-
-                        // If we get there, merge this range and the next one
-                        range = range.withNewRight(nextRange.right);
-                        liveEndpoints = merged;
-                        filteredEndpoints = filteredMerged;
-                        ++i;
-                    }
+                    liveReturned += sentQueryIterator.counter().counted();
+                    sentQueryIterator.close();
 
-                    AbstractRangeCommand nodeCmd = command.forSubRange(range);
-
-                    // collect replies and resolve according to consistency level
-                    RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace, command.timestamp);
-                    List<InetAddress> minimalEndpoints = filteredEndpoints.subList(0, Math.min(filteredEndpoints.size(), consistency_level.blockFor(keyspace)));
-                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = new ReadCallback<>(resolver, consistency_level, nodeCmd, minimalEndpoints);
-                    handler.assureSufficientLiveNodes();
-                    resolver.setSources(filteredEndpoints);
-                    if (filteredEndpoints.size() == 1
-                        && filteredEndpoints.get(0).equals(FBUtilities.getBroadcastAddress())
-                        && OPTIMIZE_LOCAL_REQUESTS)
-                    {
-                        StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler), Tracing.instance.get());
-                    }
-                    else
-                    {
-                        MessageOut<? extends AbstractRangeCommand> message = nodeCmd.createMessage();
-                        for (InetAddress endpoint : filteredEndpoints)
-                        {
-                            Tracing.trace("Enqueuing request to {}", endpoint);
-                            MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
-                        }
-                    }
-                    scanHandlers.add(Pair.create(nodeCmd, handler));
+                    // It's not the first batch of queries and we're not done, so we we can use what has been
+                    // returned so far to improve our rows-per-range estimate and update the concurrency accordingly
+                    updateConcurrencyFactor();
                 }
-                Tracing.trace("Submitted {} concurrent range requests covering {} ranges", concurrentRequests, i - concurrentFetchStartingIndex);
+                sentQueryIterator = sendNextRequests();
+            }
 
-                List<AsyncOneResponse> repairResponses = new ArrayList<>();
-                for (Pair<AbstractRangeCommand, ReadCallback<RangeSliceReply, Iterable<Row>>> cmdPairHandler : scanHandlers)
-                {
-                    ReadCallback<RangeSliceReply, Iterable<Row>> handler = cmdPairHandler.right;
-                    RangeSliceResponseResolver resolver = (RangeSliceResponseResolver)handler.resolver;
+            return sentQueryIterator.next();
+        }
 
-                    try
-                    {
-                        for (Row row : handler.get())
-                        {
-                            rows.add(row);
-                            if (countLiveRows)
-                                liveRowCount += row.getLiveCount(command.predicate, command.timestamp);
-                        }
-                        repairResponses.addAll(resolver.repairResults);
-                    }
-                    catch (ReadTimeoutException|ReadFailureException ex)
-                    {
-                        // we timed out or failed waiting for responses
-                        int blockFor = consistency_level.blockFor(keyspace);
-                        int responseCount = resolver.responses.size();
-                        String gotData = responseCount > 0
-                                         ? resolver.isDataPresent() ? " (including data)" : " (only digests)"
-                                         : "";
-
-                        boolean isTimeout = ex instanceof ReadTimeoutException;
-                        if (Tracing.isTracing())
-                        {
-                            Tracing.trace("{}; received {} of {} responses{} for range {} of {}",
-                                          (isTimeout ? "Timed out" : "Failed"), responseCount, blockFor, gotData, i, ranges.size());
-                        }
-                        else if (logger.isDebugEnabled())
-                        {
-                            logger.debug("Range slice {}; received {} of {} responses{} for range {} of {}",
-                                         (isTimeout ? "timeout" : "failure"), responseCount, blockFor, gotData, i, ranges.size());
-                        }
-                        throw ex;
-                    }
-                    catch (DigestMismatchException e)
-                    {
-                        throw new AssertionError(e); // no digests in range slices yet
-                    }
+        private void updateConcurrencyFactor()
+        {
+            if (liveReturned == 0)
+            {
+                // we haven't actually gotten any results, so query all remaining ranges at once
+                concurrencyFactor = totalRangeCount - rangesQueried;
+                return;
+            }
 
-                    // if we're done, great, otherwise, move to the next range
-                    int count = countLiveRows ? liveRowCount : rows.size();
-                    if (count >= rowsToBeFetched)
-                    {
-                        haveSufficientRows = true;
-                        break;
-                    }
-                }
+            // Otherwise, compute how many rows per range we got on average and pick a concurrency factor
+            // that should allow us to fetch all remaining rows with the next batch of (concurrent) queries.
+            int remainingRows = command.limits().count() - liveReturned;
+            float rowsPerRange = (float)liveReturned / (float)rangesQueried;
+            concurrencyFactor = Math.max(1, Math.min(totalRangeCount - rangesQueried, Math.round(remainingRows / rowsPerRange)));
+            logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
+                         rowsPerRange, (int) remainingRows, concurrencyFactor);
+        }
 
-                try
-                {
-                    FBUtilities.waitOnFutures(repairResponses, DatabaseDescriptor.getWriteRpcTimeout());
-                }
-                catch (TimeoutException ex)
-                {
-                    // We got all responses, but timed out while repairing
-                    int blockFor = consistency_level.blockFor(keyspace);
-                    if (Tracing.isTracing())
-                        Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
-                    else
-                        logger.debug("Range slice timeout while read-repairing after receiving all {} data and digest responses", blockFor);
-                    throw new ReadTimeoutException(consistency_level, blockFor-1, blockFor, true);
-                }
+        private SingleRangeResponse query(RangeForQuery toQuery)
+        {
+            PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range);
+
+            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
 
-                if (haveSufficientRows)
-                    return command.postReconciliationProcessing(rows);
+            int blockFor = consistency.blockFor(keyspace);
+            int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
+            List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
+            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
 
-                // we didn't get enough rows in our concurrent fetch; recalculate our concurrency factor
-                // based on the results we've seen so far (as long as we still have ranges left to query)
-                if (i < ranges.size())
+            handler.assureSufficientLiveNodes();
+
+            if (toQuery.filteredEndpoints.size() == 1 && canDoLocalRequest(toQuery.filteredEndpoints.get(0)))
+            {
+                StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler), Tracing.instance.get());
+            }
+            else
+            {
+                MessageOut<ReadCommand> message = rangeCommand.createMessage();
+                for (InetAddress endpoint : toQuery.filteredEndpoints)
                 {
-                    float fetchedRows = countLiveRows ? liveRowCount : rows.size();
-                    float remainingRows = rowsToBeFetched - fetchedRows;
-                    float actualRowsPerRange;
-                    if (fetchedRows == 0.0)
-                    {
-                        // we haven't actually gotten any results, so query all remaining ranges at once
-                        actualRowsPerRange = 0.0f;
-                        concurrencyFactor = ranges.size() - i;
-                    }
-                    else
-                    {
-                        actualRowsPerRange = fetchedRows / i;
-                        concurrencyFactor = Math.max(1, Math.min(ranges.size() - i, Math.round(remainingRows / actualRowsPerRange)));
-                    }
-                    logger.debug("Didn't get enough response rows; actual rows per range: {}; remaining rows: {}, new concurrent requests: {}",
-                                 actualRowsPerRange, (int) remainingRows, concurrencyFactor);
+                    Tracing.trace("Enqueuing request to {}", endpoint);
+                    MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
                 }
             }
+
+            return new SingleRangeResponse(handler);
         }
-        finally
+
+        private CountingPartitionIterator sendNextRequests()
         {
-            long latency = System.nanoTime() - startTime;
-            rangeMetrics.addNano(latency);
-            Keyspace.open(command.keyspace).getColumnFamilyStore(command.columnFamily).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+            List<PartitionIterator> concurrentQueries = new ArrayList<>(concurrencyFactor);
+            for (int i = 0; i < concurrencyFactor && ranges.hasNext(); i++)
+            {
+                concurrentQueries.add(query(ranges.next()));
+                ++rangesQueried;
+            }
+
+            Tracing.trace("Submitted {} concurrent range requests", concurrentQueries.size());
+            return new CountingPartitionIterator(PartitionIterators.concat(concurrentQueries), command.limits(), command.nowInSec());
         }
-        return command.postReconciliationProcessing(rows);
+
+        public void close()
+        {
+            try
+            {
+                if (sentQueryIterator != null)
+                    sentQueryIterator.close();
+            }
+            finally
+            {
+                long latency = System.nanoTime() - startTime;
+                rangeMetrics.addNano(latency);
+                Keyspace.openAndGetStore(command.metadata()).metric.coordinatorScanLatency.update(latency, TimeUnit.NANOSECONDS);
+            }
+        }
+    }
+
+    @SuppressWarnings("resource")
+    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
+    throws UnavailableException, ReadFailureException, ReadTimeoutException
+    {
+        Tracing.trace("Computing ranges to query");
+        long startTime = System.nanoTime();
+
+        List<FilteredPartition> partitions = new ArrayList<>();
+
+        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+        RangeIterator ranges = new RangeIterator(command, keyspace, consistencyLevel);
+
+        // our estimate of how many result rows there will be per-range
+        float resultsPerRange = estimateResultsPerRange(command, keyspace);
+        // underestimate how many rows we will get per-range in order to increase the likelihood that we'll
+        // fetch enough rows in the first round
+        resultsPerRange -= resultsPerRange * CONCURRENT_SUBREQUESTS_MARGIN;
+        int concurrencyFactor = resultsPerRange == 0.0
+                              ? 1
+                              : Math.max(1, Math.min(ranges.rangeCount(), (int) Math.ceil(command.limits().count() / resultsPerRange)));
+        logger.debug("Estimated result rows per range: {}; requested rows: {}, ranges.size(): {}; concurrent range requests: {}",
+                     resultsPerRange, command.limits().count(), ranges.rangeCount(), concurrencyFactor);
+        Tracing.trace("Submitting range requests on {} ranges with a concurrency of {} ({} rows per range expected)", ranges.rangeCount(), concurrencyFactor, resultsPerRange);
+
+        // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
+        return command.postReconciliationProcessing(command.limits().filter(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel), command.nowInSec()));
     }
 
     public Map<String, List<String>> getSchemaVersions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 25ed849..93421ce 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -67,21 +67,7 @@ import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.BatchlogManager;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.CounterMutationVerbHandler;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DefinitionsUpdateVerbHandler;
-import org.apache.cassandra.db.HintedHandOffManager;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.MigrationRequestVerbHandler;
-import org.apache.cassandra.db.MutationVerbHandler;
-import org.apache.cassandra.db.ReadRepairVerbHandler;
-import org.apache.cassandra.db.ReadVerbHandler;
-import org.apache.cassandra.db.SchemaCheckVerbHandler;
-import org.apache.cassandra.db.SnapshotDetailsTabularData;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.TruncateVerbHandler;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.BootStrapper;
@@ -311,9 +297,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         /* register the verb handlers */
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new RangeSliceVerbHandler());
-        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new RangeSliceVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler());
+        MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAGED_RANGE, new ReadCommandVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.COUNTER_MUTATION, new CounterMutationVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.TRUNCATE, new TruncateVerbHandler());
         MessagingService.instance().registerVerbHandlers(MessagingService.Verb.PAXOS_PREPARE, new PrepareVerbHandler());
@@ -3901,7 +3887,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     // Never ever do this at home. Used by tests.
-    IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
+    @VisibleForTesting
+    public IPartitioner setPartitionerUnsafe(IPartitioner newPartitioner)
     {
         IPartitioner oldPartitioner = DatabaseDescriptor.getPartitioner();
         DatabaseDescriptor.setPartitioner(newPartitioner);


Mime
View raw message