cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [13/51] [partial] cassandra git commit: Storage engine refactor, a.k.a CASSANDRA-8099
Date Tue, 30 Jun 2015 10:47:37 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3aab12f..6696e10 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -29,14 +29,13 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData.SpeculativeRetry.RetryType;
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.config.ReadRepairDecision;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -62,22 +61,15 @@ public abstract class AbstractReadExecutor
 
     protected final ReadCommand command;
     protected final List<InetAddress> targetReplicas;
-    protected final RowDigestResolver resolver;
-    protected final ReadCallback<ReadResponse, Row> handler;
+    protected final ReadCallback handler;
     protected final TraceState traceState;
 
-    AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
-        resolver = new RowDigestResolver(command.ksName, command.key, targetReplicas.size());
-        traceState = Tracing.instance.get();
-        handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas);
-    }
-
-    private static boolean isLocalRequest(InetAddress replica)
-    {
-        return replica.equals(FBUtilities.getBroadcastAddress()) && StorageProxy.OPTIMIZE_LOCAL_REQUESTS;
+        this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas);
+        this.traceState = Tracing.instance.get();
     }
 
     protected void makeDataRequests(Iterable<InetAddress> endpoints)
@@ -98,7 +90,7 @@ public abstract class AbstractReadExecutor
 
         for (InetAddress endpoint : endpoints)
         {
-            if (isLocalRequest(endpoint))
+            if (StorageProxy.canDoLocalRequest(endpoint))
             {
                 hasLocalEndpoint = true;
                 continue;
@@ -142,7 +134,7 @@ public abstract class AbstractReadExecutor
      * wait for an answer.  Blocks until success or timeout, so it is caller's
      * responsibility to call maybeTryAdditionalReplicas first.
      */
-    public Row get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
+    public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
     {
         return handler.get();
     }
@@ -150,11 +142,11 @@ public abstract class AbstractReadExecutor
     /**
      * @return an executor appropriate for the configured speculative read policy
      */
-    public static AbstractReadExecutor getReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException
+    public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException
     {
-        Keyspace keyspace = Keyspace.open(command.ksName);
-        List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.key);
-        ReadRepairDecision repairDecision = Schema.instance.getCFMetaData(command.ksName, command.cfName).newReadRepairDecision();
+        Keyspace keyspace = Keyspace.open(command.metadata().ksName);
+        List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
+        ReadRepairDecision repairDecision = command.metadata().newReadRepairDecision();
         List<InetAddress> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
 
         // Throw UAE early if we don't have enough replicas.
@@ -166,19 +158,19 @@ public abstract class AbstractReadExecutor
             ReadRepairMetrics.attempted.mark();
         }
 
-        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().cfId);
         RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
 
         // Speculative retry is disabled *OR* there are simply no extra replicas to speculate.
         if (retryType == RetryType.NONE || consistencyLevel.blockFor(keyspace) == allReplicas.size())
-            return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
+            return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel, targetReplicas);
 
         if (targetReplicas.size() == allReplicas.size())
         {
             // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
             // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
             // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
-            return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
         }
 
         // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
@@ -199,16 +191,16 @@ public abstract class AbstractReadExecutor
         targetReplicas.add(extraReplica);
 
         if (retryType == RetryType.ALWAYS)
-            return new AlwaysSpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(cfs, command, consistencyLevel, targetReplicas);
+            return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
     }
 
     private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public NeverSpeculatingReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
         {
-            super(command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas);
         }
 
         public void executeAsync()
@@ -234,12 +226,13 @@ public abstract class AbstractReadExecutor
         private final ColumnFamilyStore cfs;
         private volatile boolean speculated = false;
 
-        public SpeculatingReadExecutor(ColumnFamilyStore cfs,
+        public SpeculatingReadExecutor(Keyspace keyspace,
+                                       ColumnFamilyStore cfs,
                                        ReadCommand command,
                                        ConsistencyLevel consistencyLevel,
                                        List<InetAddress> targetReplicas)
         {
-            super(command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas);
             this.cfs = cfs;
         }
 
@@ -278,7 +271,7 @@ public abstract class AbstractReadExecutor
             {
                 // Could be waiting on the data, or on enough digests.
                 ReadCommand retryCommand = command;
-                if (resolver.getData() != null)
+                if (handler.resolver.isDataPresent())
                     retryCommand = command.copy().setIsDigestQuery(true);
 
                 InetAddress extraReplica = Iterables.getLast(targetReplicas);
@@ -304,12 +297,13 @@ public abstract class AbstractReadExecutor
     {
         private final ColumnFamilyStore cfs;
 
-        public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
+        public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
+                                             ColumnFamilyStore cfs,
                                              ReadCommand command,
                                              ConsistencyLevel consistencyLevel,
                                              List<InetAddress> targetReplicas)
         {
-            super(command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas);
             this.cfs = cfs;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
deleted file mode 100644
index f362047..0000000
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.concurrent.Accumulator;
-
-public abstract class AbstractRowResolver implements IResponseResolver<ReadResponse, Row>
-{
-    protected static final Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
-
-    protected final String keyspaceName;
-    // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
-    protected final Accumulator<MessageIn<ReadResponse>> replies;
-    protected final DecoratedKey key;
-
-    public AbstractRowResolver(ByteBuffer key, String keyspaceName, int maxResponseCount)
-    {
-        this.key = StorageService.getPartitioner().decorateKey(key);
-        this.keyspaceName = keyspaceName;
-        this.replies = new Accumulator<>(maxResponseCount);
-    }
-
-    public void preprocess(MessageIn<ReadResponse> message)
-    {
-        replies.add(message);
-    }
-
-    public Iterable<MessageIn<ReadResponse>> getMessages()
-    {
-        return replies;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
index 6ac765b..dec5319 100644
--- a/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
+++ b/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
@@ -29,11 +29,11 @@ import org.apache.cassandra.utils.WrappedRunnable;
 
 public class AsyncRepairCallback implements IAsyncCallback<ReadResponse>
 {
-    private final RowDataResolver repairResolver;
+    private final DataResolver repairResolver;
     private final int blockfor;
     protected final AtomicInteger received = new AtomicInteger(0);
 
-    public AsyncRepairCallback(RowDataResolver repairResolver, int blockfor)
+    public AsyncRepairCallback(DataResolver repairResolver, int blockfor)
     {
         this.repairResolver = repairResolver;
         this.blockfor = blockfor;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/CASRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java
index 3d86637..1db100d 100644
--- a/src/java/org/apache/cassandra/service/CASRequest.java
+++ b/src/java/org/apache/cassandra/service/CASRequest.java
@@ -17,8 +17,9 @@
  */
 package org.apache.cassandra.service;
 
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 
 /**
@@ -27,19 +28,19 @@ import org.apache.cassandra.exceptions.InvalidRequestException;
 public interface CASRequest
 {
     /**
-     * The filter to use to fetch the value to compare for the CAS.
+     * The command to use to fetch the value to compare for the CAS.
      */
-    public IDiskAtomFilter readFilter();
+    public SinglePartitionReadCommand readCommand(int nowInSec);
 
     /**
      * Returns whether the provided CF, that represents the values fetched using the
      * readFilter(), match the CAS conditions this object stands for.
      */
-    public boolean appliesTo(ColumnFamily current) throws InvalidRequestException;
+    public boolean appliesTo(FilteredPartition current) throws InvalidRequestException;
 
     /**
      * The updates to perform of a CAS success. The values fetched using the readFilter()
      * are passed as argument.
      */
-    public ColumnFamily makeUpdates(ColumnFamily current) throws InvalidRequestException;
+    public PartitionUpdate makeUpdates(FilteredPartition current) throws InvalidRequestException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java
index a775627..e82e8a4 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -43,17 +43,20 @@ import org.apache.cassandra.cache.AutoSavingCache.CacheSerializer;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition;
+import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 public class CacheService implements CacheServiceMBean
 {
@@ -362,24 +365,45 @@ public class CacheService implements CacheServiceMBean
         public Future<Pair<CounterCacheKey, ClockAndCount>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
             final ByteBuffer partitionKey = ByteBufferUtil.readWithLength(in);
-            final CellName cellName = cfs.metadata.comparator.cellFromByteBuffer(ByteBufferUtil.readWithLength(in));
+            final ByteBuffer cellName = ByteBufferUtil.readWithLength(in);
             return StageManager.getStage(Stage.READ).submit(new Callable<Pair<CounterCacheKey, ClockAndCount>>()
             {
                 public Pair<CounterCacheKey, ClockAndCount> call() throws Exception
                 {
                     DecoratedKey key = cfs.partitioner.decorateKey(partitionKey);
-                    QueryFilter filter = QueryFilter.getNamesFilter(key,
-                                                                    cfs.metadata.cfName,
-                                                                    FBUtilities.singleton(cellName, cfs.metadata.comparator),
-                                                                    Long.MIN_VALUE);
-                    ColumnFamily cf = cfs.getTopLevelColumns(filter, Integer.MIN_VALUE);
-                    if (cf == null)
-                        return null;
-                    Cell cell = cf.getColumn(cellName);
-                    if (cell == null || !cell.isLive(Long.MIN_VALUE))
-                        return null;
-                    ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
-                    return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, cellName), clockAndCount);
+                    LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(cfs.metadata, cellName);
+                    ColumnDefinition column = name.column;
+                    CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement);
+
+                    int nowInSec = FBUtilities.nowInSeconds();
+                    ColumnFilter.Builder builder = ColumnFilter.selectionBuilder();
+                    if (path == null)
+                        builder.add(column);
+                    else
+                        builder.select(column, path);
+
+                    ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(FBUtilities.<Clustering>singleton(name.clustering, cfs.metadata.comparator), false);
+                    SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(cfs.metadata, nowInSec, key, builder.build(), filter);
+                    try (OpOrder.Group op = cfs.readOrdering.start(); RowIterator iter = UnfilteredRowIterators.filter(cmd.queryMemtableAndDisk(cfs, op), nowInSec))
+                    {
+                        Cell cell;
+                        if (column.isStatic())
+                        {
+                            cell = iter.staticRow().getCell(column);
+                        }
+                        else
+                        {
+                            if (!iter.hasNext())
+                                return null;
+                            cell = iter.next().getCell(column);
+                        }
+
+                        if (cell == null)
+                            return null;
+
+                        ClockAndCount clockAndCount = CounterContext.instance().getLocalClockAndCount(cell.value());
+                        return Pair.create(CounterCacheKey.create(cfs.metadata.cfId, partitionKey, name.clustering, column, path), clockAndCount);
+                    }
                 }
             });
         }
@@ -395,14 +419,19 @@ public class CacheService implements CacheServiceMBean
         public Future<Pair<RowCacheKey, IRowCacheEntry>> deserialize(DataInputStream in, final ColumnFamilyStore cfs) throws IOException
         {
             final ByteBuffer buffer = ByteBufferUtil.readWithLength(in);
+            final int rowsToCache = cfs.metadata.getCaching().rowCache.rowsToCache;
+
             return StageManager.getStage(Stage.READ).submit(new Callable<Pair<RowCacheKey, IRowCacheEntry>>()
             {
                 public Pair<RowCacheKey, IRowCacheEntry> call() throws Exception
                 {
                     DecoratedKey key = cfs.partitioner.decorateKey(buffer);
-                    QueryFilter cacheFilter = new QueryFilter(key, cfs.getColumnFamilyName(), cfs.readFilterForCache(), Integer.MIN_VALUE);
-                    ColumnFamily data = cfs.getTopLevelColumns(cacheFilter, Integer.MIN_VALUE);
-                    return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry) data);
+                    int nowInSec = FBUtilities.nowInSeconds();
+                    try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op))
+                    {
+                        CachedPartition toCache = ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec);
+                        return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache);
+                    }
                 }
             });
         }
@@ -423,7 +452,7 @@ public class CacheService implements CacheServiceMBean
             ByteBufferUtil.writeWithLength(key.key, out);
             out.writeInt(key.desc.generation);
             out.writeBoolean(true);
-            key.desc.getFormat().getIndexSerializer(cfm).serialize(entry, out);
+            key.desc.getFormat().getIndexSerializer(cfm, key.desc.version, SerializationHeader.forKeyCache(cfm)).serialize(entry, out);
         }
 
         public Future<Pair<KeyCacheKey, RowIndexEntry>> deserialize(DataInputStream input, ColumnFamilyStore cfs) throws IOException
@@ -443,7 +472,10 @@ public class CacheService implements CacheServiceMBean
                 RowIndexEntry.Serializer.skipPromotedIndex(input);
                 return null;
             }
-            RowIndexEntry entry = reader.descriptor.getFormat().getIndexSerializer(reader.metadata).deserialize(input, reader.descriptor.version);
+            RowIndexEntry.IndexSerializer<?> indexSerializer = reader.descriptor.getFormat().getIndexSerializer(reader.metadata,
+                                                                                                                reader.descriptor.version,
+                                                                                                                SerializationHeader.forKeyCache(cfs.metadata));
+            RowIndexEntry entry = indexSerializer.deserialize(input);
             return Futures.immediateFuture(Pair.create(new KeyCacheKey(cfs.metadata.cfId, reader.descriptor, key), entry));
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
new file mode 100644
index 0000000..b2d1954
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.*;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.net.AsyncOneResponse;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class DataResolver extends ResponseResolver
+{
+    private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<AsyncOneResponse>());
+
+    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+    {
+        super(keyspace, command, consistency, maxResponseCount);
+    }
+
+    public PartitionIterator getData()
+    {
+        ReadResponse response = responses.iterator().next().payload;
+        return UnfilteredPartitionIterators.filter(response.makeIterator(), command.nowInSec());
+    }
+
+    public PartitionIterator resolve()
+    {
+        // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here
+        // at the beginning of this method), so grab the response count once and use that through the method.
+        int count = responses.size();
+        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
+        InetAddress[] sources = new InetAddress[count];
+        for (int i = 0; i < count; i++)
+        {
+            MessageIn<ReadResponse> msg = responses.get(i);
+            iters.add(msg.payload.makeIterator());
+            sources[i] = msg.from;
+        }
+
+        // Even though every responses should honor the limit, we might have more than requested post reconciliation,
+        // so ensure we're respecting the limit.
+        DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true);
+        return new CountingPartitionIterator(mergeWithShortReadProtection(iters, sources, counter), counter);
+    }
+
+    private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter)
+    {
+        // If we have only one results, there is no read repair to do and we can't get short reads
+        if (results.size() == 1)
+            return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec());
+
+        UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources);
+
+        // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit,
+        // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother.
+        if (command.limits().isUnlimited())
+            return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+
+        for (int i = 0; i < results.size(); i++)
+            results.set(i, new ShortReadProtectedIterator(sources[i], results.get(i), resultCounter));
+
+        return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
+    }
+
+    private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener
+    {
+        private final InetAddress[] sources;
+
+        public RepairMergeListener(InetAddress[] sources)
+        {
+            this.sources = sources;
+        }
+
+        public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
+        {
+            return new MergeListener(partitionKey, columns(versions), isReversed(versions));
+        }
+
+        private PartitionColumns columns(List<UnfilteredRowIterator> versions)
+        {
+            Columns statics = Columns.NONE;
+            Columns regulars = Columns.NONE;
+            for (UnfilteredRowIterator iter : versions)
+            {
+                if (iter == null)
+                    continue;
+
+                PartitionColumns cols = iter.columns();
+                statics = statics.mergeTo(cols.statics);
+                regulars = regulars.mergeTo(cols.regulars);
+            }
+            return new PartitionColumns(statics, regulars);
+        }
+
+        private boolean isReversed(List<UnfilteredRowIterator> versions)
+        {
+            assert !versions.isEmpty();
+            // Everything will be in the same order
+            return versions.get(0).isReverseOrder();
+        }
+
+        public void close()
+        {
+            try
+            {
+                FBUtilities.waitOnFutures(repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+            }
+            catch (TimeoutException ex)
+            {
+                // We got all responses, but timed out while repairing
+                int blockFor = consistency.blockFor(keyspace);
+                if (Tracing.isTracing())
+                    Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
+                else
+                    logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
+
+                throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true);
+            }
+        }
+
+        private class MergeListener implements UnfilteredRowIterators.MergeListener
+        {
+            private final DecoratedKey partitionKey;
+            private final PartitionColumns columns;
+            private final boolean isReversed;
+            private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length];
+
+            private final Row.Writer[] currentRows = new Row.Writer[sources.length];
+            private Clustering currentClustering;
+            private ColumnDefinition currentColumn;
+
+            private final Slice.Bound[] markerOpen = new Slice.Bound[sources.length];
+            private final DeletionTime[] markerTime = new DeletionTime[sources.length];
+
+            public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed)
+            {
+                this.partitionKey = partitionKey;
+                this.columns = columns;
+                this.isReversed = isReversed;
+            }
+
+            private PartitionUpdate update(int i)
+            {
+                PartitionUpdate upd = repairs[i];
+                if (upd == null)
+                {
+                    upd = new PartitionUpdate(command.metadata(), partitionKey, columns, 1);
+                    repairs[i] = upd;
+                }
+                return upd;
+            }
+
+            private Row.Writer currentRow(int i)
+            {
+                Row.Writer row = currentRows[i];
+                if (row == null)
+                {
+                    row = currentClustering == Clustering.STATIC_CLUSTERING ? update(i).staticWriter() : update(i).writer();
+                    currentClustering.writeTo(row);
+                    currentRows[i] = row;
+                }
+                return row;
+            }
+
+            public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
+            {
+                for (int i = 0; i < versions.length; i++)
+                {
+                    DeletionTime version = versions[i];
+                    if (mergedDeletion.supersedes(versions[i]))
+                        update(i).addPartitionDeletion(mergedDeletion);
+                }
+            }
+
+            public void onMergingRows(Clustering clustering,
+                                      LivenessInfo mergedInfo,
+                                      DeletionTime mergedDeletion,
+                                      Row[] versions)
+            {
+                currentClustering = clustering;
+                for (int i = 0; i < versions.length; i++)
+                {
+                    Row version = versions[i];
+
+                    if (version == null || mergedInfo.supersedes(version.primaryKeyLivenessInfo()))
+                        currentRow(i).writePartitionKeyLivenessInfo(mergedInfo);
+
+                    if (version == null || mergedDeletion.supersedes(version.deletion()))
+                        currentRow(i).writeRowDeletion(mergedDeletion);
+                }
+            }
+
+            public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions)
+            {
+                currentColumn = c;
+                for (int i = 0; i < versions.length; i++)
+                {
+                    DeletionTime version = versions[i] == null ? DeletionTime.LIVE : versions[i];
+                    if (mergedCompositeDeletion.supersedes(version))
+                        currentRow(i).writeComplexDeletion(c, mergedCompositeDeletion);
+                }
+            }
+
+            public void onMergedCells(Cell mergedCell, Cell[] versions)
+            {
+                for (int i = 0; i < versions.length; i++)
+                {
+                    Cell version = versions[i];
+                    Cell toAdd = version == null ? mergedCell : Cells.diff(mergedCell, version);
+                    if (toAdd != null)
+                        toAdd.writeTo(currentRow(i));
+                }
+            }
+
+            public void onRowDone()
+            {
+                for (int i = 0; i < currentRows.length; i++)
+                {
+                    if (currentRows[i] != null)
+                        currentRows[i].endOfRow();
+                }
+                Arrays.fill(currentRows, null);
+            }
+
+            public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions)
+            {
+                for (int i = 0; i < versions.length; i++)
+                {
+                    RangeTombstoneMarker marker = versions[i];
+                    // Note that boundaries are both close and open, so it's not one or the other
+                    if (merged.isClose(isReversed) && markerOpen[i] != null)
+                    {
+                        Slice.Bound open = markerOpen[i];
+                        Slice.Bound close = merged.isBoundary() ? ((RangeTombstoneBoundaryMarker)merged).createCorrespondingCloseBound(isReversed).clustering() : merged.clustering();
+                        update(i).addRangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i]);
+                    }
+                    if (merged.isOpen(isReversed) && (marker == null || merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed))))
+                    {
+                        markerOpen[i] = merged.isBoundary() ? ((RangeTombstoneBoundaryMarker)merged).createCorrespondingOpenBound(isReversed).clustering() : merged.clustering();
+                        markerTime[i] = merged.openDeletionTime(isReversed);
+                    }
+                }
+            }
+
+            public void close()
+            {
+                for (int i = 0; i < repairs.length; i++)
+                {
+                    if (repairs[i] == null)
+                        continue;
+
+                    // use a separate verb here because we don't want these to be get the white glove hint-
+                    // on-timeout behavior that a "real" mutation gets
+                    Tracing.trace("Sending read-repair-mutation to {}", sources[i]);
+                    MessageOut<Mutation> msg = new Mutation(repairs[i]).createMessage(MessagingService.Verb.READ_REPAIR);
+                    repairResults.add(MessagingService.instance().sendRR(msg, sources[i]));
+                }
+            }
+        }
+    }
+
+    private class ShortReadProtectedIterator extends CountingUnfilteredPartitionIterator
+    {
+        private final InetAddress source;
+        private final DataLimits.Counter postReconciliationCounter;
+
+        private ShortReadProtectedIterator(InetAddress source, UnfilteredPartitionIterator iterator, DataLimits.Counter postReconciliationCounter)
+        {
+            super(iterator, command.limits().newCounter(command.nowInSec(), false));
+            this.source = source;
+            this.postReconciliationCounter = postReconciliationCounter;
+        }
+
+        @Override
+        public UnfilteredRowIterator next()
+        {
+            return new ShortReadProtectedRowIterator(super.next());
+        }
+
+        private class ShortReadProtectedRowIterator extends WrappingUnfilteredRowIterator
+        {
+            private boolean initialReadIsDone;
+            private UnfilteredRowIterator shortReadContinuation;
+            private Clustering lastClustering;
+
+            ShortReadProtectedRowIterator(UnfilteredRowIterator iter)
+            {
+                super(iter);
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                if (super.hasNext())
+                    return true;
+
+                initialReadIsDone = true;
+
+                if (shortReadContinuation != null && shortReadContinuation.hasNext())
+                    return true;
+
+                return checkForShortRead();
+            }
+
+            @Override
+            public Unfiltered next()
+            {
+                Unfiltered next = initialReadIsDone ? shortReadContinuation.next() : super.next();
+
+                if (next.kind() == Unfiltered.Kind.ROW)
+                    lastClustering = ((Row)next).clustering();
+
+                return next;
+            }
+
+            @Override
+            public void close()
+            {
+                try
+                {
+                    super.close();
+                }
+                finally
+                {
+                    if (shortReadContinuation != null)
+                        shortReadContinuation.close();
+                }
+            }
+
+            private boolean checkForShortRead()
+            {
+                assert shortReadContinuation == null || !shortReadContinuation.hasNext();
+
+                // We have a short read if the node this is the result of has returned the requested number of
+                // rows for that partition (i.e. it has stopped returning results due to the limit), but some of
+                // those results haven't made it in the final result post-reconciliation due to other nodes
+                // tombstones. If that is the case, then the node might have more results that we should fetch
+                // as otherwise we might return less results than required, or results that shouldn't be returned
+                // (because the node has tombstone that hides future results from other nodes but that haven't
+                // been returned due to the limit).
+                // Also note that we only get here once all the results for this node have been returned, and so
+                // if the node had returned the requested number but we still get there, it imply some results were
+                // skipped during reconciliation.
+                if (!counter.isDoneForPartition())
+                    return false;
+
+                assert !postReconciliationCounter.isDoneForPartition();
+
+                // We need to try to query enough additional results to fulfill our query, but because we could still
+                // get short reads on that additional query, just querying the number of results we miss may not be
+                // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only
+                // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result.
+                // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that
+                // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n.
+                // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a
+                // counting iterator.
+                int n = postReconciliationCounter.countedInCurrentPartition();
+                int x = counter.countedInCurrentPartition();
+                int toQuery = x == 0
+                              ? n * 2     // We didn't got any answer, so (somewhat randomly) ask for twice as much
+                              : Math.max(((n * n) / x) - n, 1);
+
+                DataLimits retryLimits = command.limits().forShortReadRetry(toQuery);
+                ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey());
+                ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata().comparator, lastClustering, false);
+                SinglePartitionReadCommand<?> cmd = SinglePartitionReadCommand.create(command.metadata(),
+                                                                                      command.nowInSec(),
+                                                                                      command.columnFilter(),
+                                                                                      command.rowFilter(),
+                                                                                      retryLimits,
+                                                                                      partitionKey(),
+                                                                                      retryFilter);
+
+                shortReadContinuation = doShortReadRetry(cmd);
+                return shortReadContinuation.hasNext();
+            }
+
+            private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand<?> retryCommand)
+            {
+                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
+                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+                if (StorageProxy.canDoLocalRequest(source))
+                    StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
+                else
+                    MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler);
+
+                // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results.
+                handler.awaitResults();
+                assert resolver.responses.size() == 1;
+                return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(), retryCommand);
+            }
+        }
+    }
+
+    public boolean isDataPresent()
+    {
+        return !responses.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java
new file mode 100644
index 0000000..12b0626
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.net.MessageIn;
+
+public class DigestResolver extends ResponseResolver
+{
+    private volatile ReadResponse dataResponse;
+
+    public DigestResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+    {
+        super(keyspace, command, consistency, maxResponseCount);
+    }
+
+    @Override
+    public void preprocess(MessageIn<ReadResponse> message)
+    {
+        super.preprocess(message);
+        if (dataResponse == null && !message.payload.isDigestQuery())
+            dataResponse = message.payload;
+    }
+
+    /**
+     * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
+     */
+    public PartitionIterator getData()
+    {
+        assert isDataPresent();
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec());
+    }
+
+    /*
+     * This method handles two different scenarios:
+     *
+     * a) we're handling the initial read of data from the closest replica + digests
+     *    from the rest. In this case we check the digests against each other,
+     *    throw an exception if there is a mismatch, otherwise return the data row.
+     *
+     * b) we're checking additional digests that arrived after the minimum to handle
+     *    the requested ConsistencyLevel, i.e. asynchronous read repair check
+     */
+    public PartitionIterator resolve() throws DigestMismatchException
+    {
+        if (responses.size() == 1)
+            return getData();
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolving {} responses", responses.size());
+
+        long start = System.nanoTime();
+
+        // validate digests against each other; throw immediately on mismatch.
+        ByteBuffer digest = null;
+        for (MessageIn<ReadResponse> message : responses)
+        {
+            ReadResponse response = message.payload;
+
+            ByteBuffer newDigest = response.digest();
+            if (digest == null)
+                digest = newDigest;
+            else if (!digest.equals(newDigest))
+                // rely on the fact that only single partition queries use digests
+                throw new DigestMismatchException(((SinglePartitionReadCommand)command).partitionKey(), digest, newDigest);
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+
+        return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(), command.nowInSec());
+    }
+
+    public boolean isDataPresent()
+    {
+        return dataResponse != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/IReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IReadCommand.java b/src/java/org/apache/cassandra/service/IReadCommand.java
deleted file mode 100644
index c6a129e..0000000
--- a/src/java/org/apache/cassandra/service/IReadCommand.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-public interface IReadCommand
-{
-    public String getKeyspace();
-    public long getTimeout();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/IResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/IResponseResolver.java b/src/java/org/apache/cassandra/service/IResponseResolver.java
deleted file mode 100644
index 17c8bff..0000000
--- a/src/java/org/apache/cassandra/service/IResponseResolver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import org.apache.cassandra.net.MessageIn;
-
-public interface IResponseResolver<TMessage, TResolved> {
-
-    /**
-     * This Method resolves the responses that are passed in . for example : if
-     * its write response then all we get is true or false return values which
-     * implies if the writes were successful but for reads its more complicated
-     * you need to look at the responses and then based on differences schedule
-     * repairs . Hence you need to derive a response resolver based on your
-     * needs from this interface.
-     */
-    public TResolved resolve() throws DigestMismatchException;
-
-    public boolean isDataPresent();
-
-    /**
-     * returns the data response without comparing with any digests
-     */
-    public TResolved getData();
-
-    public void preprocess(MessageIn<TMessage> message);
-    public Iterable<MessageIn<TMessage>> getMessages();
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java b/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
deleted file mode 100644
index 640681b..0000000
--- a/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import com.google.common.collect.AbstractIterator;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.AsyncOneResponse;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.Pair;
-
-/**
- * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
- * to the most recent ColumnFamily and setting up read repairs as necessary.
- */
-public class RangeSliceResponseResolver implements IResponseResolver<RangeSliceReply, Iterable<Row>>
-{
-    private static final Comparator<Pair<Row,InetAddress>> pairComparator = new Comparator<Pair<Row, InetAddress>>()
-    {
-        public int compare(Pair<Row, InetAddress> o1, Pair<Row, InetAddress> o2)
-        {
-            return o1.left.key.compareTo(o2.left.key);
-        }
-    };
-
-    private final String keyspaceName;
-    private final long timestamp;
-    private List<InetAddress> sources;
-    protected final Collection<MessageIn<RangeSliceReply>> responses = new ConcurrentLinkedQueue<MessageIn<RangeSliceReply>>();
-    public final List<AsyncOneResponse> repairResults = new ArrayList<AsyncOneResponse>();
-
-    public RangeSliceResponseResolver(String keyspaceName, long timestamp)
-    {
-        this.keyspaceName = keyspaceName;
-        this.timestamp = timestamp;
-    }
-
-    public void setSources(List<InetAddress> endpoints)
-    {
-        this.sources = endpoints;
-    }
-
-    public List<Row> getData()
-    {
-        MessageIn<RangeSliceReply> response = responses.iterator().next();
-        return response.payload.rows;
-    }
-
-    // Note: this would deserialize the response a 2nd time if getData was called first.
-    // (this is not currently an issue since we don't do read repair for range queries.)
-    public Iterable<Row> resolve()
-    {
-        ArrayList<RowIterator> iters = new ArrayList<RowIterator>(responses.size());
-        int n = 0;
-        for (MessageIn<RangeSliceReply> response : responses)
-        {
-            RangeSliceReply reply = response.payload;
-            n = Math.max(n, reply.rows.size());
-            iters.add(new RowIterator(reply.rows.iterator(), response.from));
-        }
-        // for each row, compute the combination of all different versions seen, and repair incomplete versions
-        // TODO do we need to call close?
-        CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer());
-
-        List<Row> resolvedRows = new ArrayList<Row>(n);
-        while (iter.hasNext())
-            resolvedRows.add(iter.next());
-
-        return resolvedRows;
-    }
-
-    public void preprocess(MessageIn message)
-    {
-        responses.add(message);
-    }
-
-    public boolean isDataPresent()
-    {
-        return !responses.isEmpty();
-    }
-
-    private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>> implements CloseableIterator<Pair<Row,InetAddress>>
-    {
-        private final Iterator<Row> iter;
-        private final InetAddress source;
-
-        private RowIterator(Iterator<Row> iter, InetAddress source)
-        {
-            this.iter = iter;
-            this.source = source;
-        }
-
-        protected Pair<Row,InetAddress> computeNext()
-        {
-            return iter.hasNext() ? Pair.create(iter.next(), source) : endOfData();
-        }
-
-        public void close() {}
-    }
-
-    public Iterable<MessageIn<RangeSliceReply>> getMessages()
-    {
-        return responses;
-    }
-
-    private class Reducer extends MergeIterator.Reducer<Pair<Row,InetAddress>, Row>
-    {
-        List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
-        List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size());
-        DecoratedKey key;
-
-        public void reduce(Pair<Row,InetAddress> current)
-        {
-            key = current.left.key;
-            versions.add(current.left.cf);
-            versionSources.add(current.right);
-        }
-
-        protected Row getReduced()
-        {
-            ColumnFamily resolved = versions.size() > 1
-                                  ? RowDataResolver.resolveSuperset(versions, timestamp)
-                                  : versions.get(0);
-            if (versions.size() < sources.size())
-            {
-                // add placeholder rows for sources that didn't have any data, so maybeScheduleRepairs sees them
-                for (InetAddress source : sources)
-                {
-                    if (!versionSources.contains(source))
-                    {
-                        versions.add(null);
-                        versionSources.add(source);
-                    }
-                }
-            }
-            // resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet
-            if (resolved != null)
-                repairResults.addAll(RowDataResolver.scheduleRepairs(resolved, keyspaceName, key, versions, versionSources));
-            versions.clear();
-            versionSources.clear();
-            return new Row(key, resolved);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
deleted file mode 100644
index 0f3726c..0000000
--- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import org.apache.cassandra.db.AbstractRangeCommand;
-import org.apache.cassandra.db.RangeSliceReply;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.tracing.Tracing;
-
-public class RangeSliceVerbHandler implements IVerbHandler<AbstractRangeCommand>
-{
-    public void doVerb(MessageIn<AbstractRangeCommand> message, int id)
-    {
-        if (StorageService.instance.isBootstrapMode())
-        {
-            /* Don't service reads! */
-            throw new RuntimeException("Cannot service reads while bootstrapping!");
-        }
-        RangeSliceReply reply = new RangeSliceReply(message.payload.executeLocally());
-        Tracing.trace("Enqueuing response to {}", message.from);
-        MessagingService.instance().sendReply(reply.createMessage(), id, message.from);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 0c008e7..d548019 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
@@ -46,16 +46,16 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
-public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFailure<TMessage>
+public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
-    public final IResponseResolver<TMessage, TResolved> resolver;
+    public final ResponseResolver resolver;
     private final SimpleCondition condition = new SimpleCondition();
-    final long start;
+    private final long start;
     final int blockfor;
     final List<InetAddress> endpoints;
-    private final IReadCommand command;
+    private final ReadCommand command;
     private final ConsistencyLevel consistencyLevel;
     private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
@@ -69,14 +69,17 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> filteredEndpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints)
     {
-        this(resolver, consistencyLevel, consistencyLevel.blockFor(Keyspace.open(command.getKeyspace())), command, Keyspace.open(command.getKeyspace()), filteredEndpoints);
-        if (logger.isTraceEnabled())
-            logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
+        this(resolver,
+             consistencyLevel,
+             consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)),
+             command,
+             Keyspace.open(command.metadata().ksName),
+             filteredEndpoints);
     }
 
-    public ReadCallback(IResponseResolver<TMessage, TResolved> resolver, ConsistencyLevel consistencyLevel, int blockfor, IReadCommand command, Keyspace keyspace, List<InetAddress> endpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints)
     {
         this.command = command;
         this.keyspace = keyspace;
@@ -86,7 +89,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail
         this.start = System.nanoTime();
         this.endpoints = endpoints;
         // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
-        assert !(resolver instanceof RangeSliceResponseResolver) || blockfor >= endpoints.size();
+        assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
+
+        if (logger.isTraceEnabled())
+            logger.trace(String.format("Blockfor is %s; setting up requests to %s", blockfor, StringUtils.join(this.endpoints, ",")));
     }
 
     public boolean await(long timePastStart, TimeUnit unit)
@@ -102,31 +108,46 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail
         }
     }
 
-    public TResolved get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
+    public void awaitResults() throws ReadFailureException, ReadTimeoutException
     {
-        if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
+        boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
+        boolean failed = blockfor + failures > endpoints.size();
+        if (signaled && !failed)
+            return;
+
+        if (Tracing.isTracing())
         {
-            // Same as for writes, see AbstractWriteResponseHandler
-            ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
-            Tracing.trace("Read timeout: {}", ex.toString());
-            if (logger.isDebugEnabled())
-                logger.debug("Read timeout: {}", ex.toString());
-            throw ex;
+            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
+            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
         }
-
-        if (blockfor + failures > endpoints.size())
+        else if (logger.isDebugEnabled())
         {
-            ReadFailureException ex = new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent());
-
-            if (logger.isDebugEnabled())
-                logger.debug("Read failure: {}", ex.toString());
-            throw ex;
+            String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
+            logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
         }
 
-        return blockfor == 1 ? resolver.getData() : resolver.resolve();
+        // Same as for writes, see AbstractWriteResponseHandler
+        throw failed
+            ? new ReadFailureException(consistencyLevel, received, failures, blockfor, resolver.isDataPresent())
+            : new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
+    }
+
+    public PartitionIterator get() throws ReadFailureException, ReadTimeoutException, DigestMismatchException
+    {
+        awaitResults();
+
+        PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
+        if (logger.isDebugEnabled())
+            logger.debug("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+        return result;
+    }
+
+    public int blockFor()
+    {
+        return blockfor;
     }
 
-    public void response(MessageIn<TMessage> message)
+    public void response(MessageIn<ReadResponse> message)
     {
         resolver.preprocess(message);
         int n = waitingFor(message.from)
@@ -165,13 +186,13 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail
         return received;
     }
 
-    public void response(TMessage result)
+    public void response(ReadResponse result)
     {
-        MessageIn<TMessage> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
-                                                       result,
-                                                       Collections.<String, byte[]>emptyMap(),
-                                                       MessagingService.Verb.INTERNAL_RESPONSE,
-                                                       MessagingService.current_version);
+        MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
+                                                           result,
+                                                           Collections.<String, byte[]>emptyMap(),
+                                                           MessagingService.Verb.INTERNAL_RESPONSE,
+                                                           MessagingService.current_version);
         response(message);
     }
 
@@ -196,7 +217,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail
 
         public void run()
         {
-            // If the resolver is a RowDigestResolver, we need to do a full data read if there is a mismatch.
+            // If the resolver is a DigestResolver, we need to do a full data read if there is a mismatch.
             // Otherwise, resolve will send the repairs directly if needs be (and in that case we should never
             // get a digest mismatch)
             try
@@ -205,7 +226,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail
             }
             catch (DigestMismatchException e)
             {
-                assert resolver instanceof RowDigestResolver;
+                assert resolver instanceof DigestResolver;
 
                 if (traceState != null)
                     traceState.trace("Digest mismatch: {}", e.toString());
@@ -214,11 +235,10 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallbackWithFail
                 
                 ReadRepairMetrics.repairedBackground.mark();
                 
-                ReadCommand readCommand = (ReadCommand) command;
-                final RowDataResolver repairResolver = new RowDataResolver(readCommand.ksName, readCommand.key, readCommand.filter(), readCommand.timestamp, endpoints.size());
+                final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size());
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
-                MessageOut<ReadCommand> message = ((ReadCommand) command).createMessage();
+                MessageOut<ReadCommand> message = command.createMessage();
                 for (InetAddress endpoint : endpoints)
                     MessagingService.instance().sendRR(message, endpoint, repairHandler);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ResponseResolver.java b/src/java/org/apache/cassandra/service/ResponseResolver.java
new file mode 100644
index 0000000..e7c94a1
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ResponseResolver.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.concurrent.Accumulator;
+
+public abstract class ResponseResolver
+{
+    protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
+
+    protected final Keyspace keyspace;
+    protected final ReadCommand command;
+    protected final ConsistencyLevel consistency;
+
+    // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
+    protected final Accumulator<MessageIn<ReadResponse>> responses;
+
+    public ResponseResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+    {
+        this.keyspace = keyspace;
+        this.command = command;
+        this.consistency = consistency;
+        this.responses = new Accumulator<>(maxResponseCount);
+    }
+
+    public abstract PartitionIterator getData();
+    public abstract PartitionIterator resolve() throws DigestMismatchException;
+
+    public abstract boolean isDataPresent();
+
+    public void preprocess(MessageIn<ReadResponse> message)
+    {
+        responses.add(message);
+    }
+
+    public Iterable<MessageIn<ReadResponse>> getMessages()
+    {
+        return responses;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
deleted file mode 100644
index e935ce7..0000000
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.IDiskAtomFilter;
-import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.net.*;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.FBUtilities;
-
-public class RowDataResolver extends AbstractRowResolver
-{
-    private int maxLiveCount = 0;
-    public List<AsyncOneResponse> repairResults = Collections.emptyList();
-    private final IDiskAtomFilter filter;
-    private final long timestamp;
-
-    public RowDataResolver(String keyspaceName, ByteBuffer key, IDiskAtomFilter qFilter, long timestamp, int maxResponseCount)
-    {
-        super(key, keyspaceName, maxResponseCount);
-        this.filter = qFilter;
-        this.timestamp = timestamp;
-    }
-
-    /*
-    * This method handles the following scenario:
-    *
-    * there was a mismatch on the initial read, so we redid the digest requests
-    * as full data reads.  In this case we need to compute the most recent version
-    * of each column, and send diffs to out-of-date replicas.
-    */
-    public Row resolve() throws DigestMismatchException
-    {
-        int replyCount = replies.size();
-        if (logger.isDebugEnabled())
-            logger.debug("resolving {} responses", replyCount);
-        long start = System.nanoTime();
-
-        ColumnFamily resolved;
-        if (replyCount > 1)
-        {
-            List<ColumnFamily> versions = new ArrayList<>(replyCount);
-            List<InetAddress> endpoints = new ArrayList<>(replyCount);
-
-            for (MessageIn<ReadResponse> message : replies)
-            {
-                ReadResponse response = message.payload;
-                ColumnFamily cf = response.row().cf;
-                assert !response.isDigestQuery() : "Received digest response to repair read from " + message.from;
-                versions.add(cf);
-                endpoints.add(message.from);
-
-                // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643
-                int liveCount = cf == null ? 0 : filter.getLiveCount(cf, timestamp);
-                if (liveCount > maxLiveCount)
-                    maxLiveCount = liveCount;
-            }
-
-            resolved = resolveSuperset(versions, timestamp);
-            if (logger.isDebugEnabled())
-                logger.debug("versions merged");
-
-            // send updates to any replica that was missing part of the full row
-            // (resolved can be null even if versions doesn't have all nulls because of the call to removeDeleted in resolveSuperSet)
-            if (resolved != null)
-                repairResults = scheduleRepairs(resolved, keyspaceName, key, versions, endpoints);
-        }
-        else
-        {
-            resolved = replies.get(0).payload.row().cf;
-        }
-
-        if (logger.isDebugEnabled())
-            logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-
-        return new Row(key, resolved);
-    }
-
-    /**
-     * For each row version, compare with resolved (the superset of all row versions);
-     * if it is missing anything, send a mutation to the endpoint it come from.
-     */
-    public static List<AsyncOneResponse> scheduleRepairs(ColumnFamily resolved, String keyspaceName, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
-    {
-        List<AsyncOneResponse> results = new ArrayList<AsyncOneResponse>(versions.size());
-
-        for (int i = 0; i < versions.size(); i++)
-        {
-            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
-            if (diffCf == null) // no repair needs to happen
-                continue;
-
-            // create and send the mutation message based on the diff
-            Mutation mutation = new Mutation(keyspaceName, key.getKey(), diffCf);
-            // use a separate verb here because we don't want these to be get the white glove hint-
-            // on-timeout behavior that a "real" mutation gets
-            Tracing.trace("Sending read-repair-mutation to {}", endpoints.get(i));
-            results.add(MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR),
-                                                           endpoints.get(i)));
-        }
-
-        return results;
-    }
-
-    static ColumnFamily resolveSuperset(Iterable<ColumnFamily> versions, long now)
-    {
-        assert Iterables.size(versions) > 0;
-
-        ColumnFamily resolved = null;
-        for (ColumnFamily cf : versions)
-        {
-            if (cf == null)
-                continue;
-
-            if (resolved == null)
-                resolved = cf.cloneMeShallow();
-            else
-                resolved.delete(cf);
-        }
-        if (resolved == null)
-            return null;
-
-        // mimic the collectCollatedColumn + removeDeleted path that getColumnFamily takes.
-        // this will handle removing columns and subcolumns that are suppressed by a row or
-        // supercolumn tombstone.
-        QueryFilter filter = new QueryFilter(null, resolved.metadata().cfName, new IdentityQueryFilter(), now);
-        List<CloseableIterator<Cell>> iters = new ArrayList<>(Iterables.size(versions));
-        for (ColumnFamily version : versions)
-            if (version != null)
-                iters.add(FBUtilities.closeableIterator(version.iterator()));
-        filter.collateColumns(resolved, iters, Integer.MIN_VALUE);
-        return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
-    }
-
-    public Row getData()
-    {
-        assert !replies.isEmpty();
-        return replies.get(0).payload.row();
-    }
-
-    public boolean isDataPresent()
-    {
-        return !replies.isEmpty();
-    }
-
-    public int getMaxLiveCount()
-    {
-        return maxLiveCount;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/RowDigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDigestResolver.java b/src/java/org/apache/cassandra/service/RowDigestResolver.java
deleted file mode 100644
index 82ccc1a..0000000
--- a/src/java/org/apache/cassandra/service/RowDigestResolver.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.service;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ReadResponse;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.net.MessageIn;
-
-public class RowDigestResolver extends AbstractRowResolver
-{
-    public RowDigestResolver(String keyspaceName, ByteBuffer key, int maxResponseCount)
-    {
-        super(key, keyspaceName, maxResponseCount);
-    }
-
-    /**
-     * Special case of resolve() so that CL.ONE reads never throw DigestMismatchException in the foreground
-     */
-    public Row getData()
-    {
-        for (MessageIn<ReadResponse> message : replies)
-        {
-            ReadResponse result = message.payload;
-            if (!result.isDigestQuery())
-                return result.row();
-        }
-        return null;
-    }
-
-    /*
-     * This method handles two different scenarios:
-     *
-     * a) we're handling the initial read, of data from the closest replica + digests
-     *    from the rest.  In this case we check the digests against each other,
-     *    throw an exception if there is a mismatch, otherwise return the data row.
-     *
-     * b) we're checking additional digests that arrived after the minimum to handle
-     *    the requested ConsistencyLevel, i.e. asynchronous read repair check
-     */
-    public Row resolve() throws DigestMismatchException
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("resolving {} responses", replies.size());
-
-        long start = System.nanoTime();
-
-        // validate digests against each other; throw immediately on mismatch.
-        // also extract the data reply, if any.
-        ColumnFamily data = null;
-        ByteBuffer digest = null;
-
-        for (MessageIn<ReadResponse> message : replies)
-        {
-            ReadResponse response = message.payload;
-
-            ByteBuffer newDigest;
-            if (response.isDigestQuery())
-            {
-                newDigest = response.digest();
-            }
-            else
-            {
-                // note that this allows for multiple data replies, post-CASSANDRA-5932
-                data = response.row().cf;
-                newDigest = ColumnFamily.digest(data);
-            }
-
-            if (digest == null)
-                digest = newDigest;
-            else if (!digest.equals(newDigest))
-                throw new DigestMismatchException(key, digest, newDigest);
-        }
-
-        if (logger.isDebugEnabled())
-            logger.debug("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
-        return new Row(key, data);
-    }
-
-    public boolean isDataPresent()
-    {
-        for (MessageIn<ReadResponse> message : replies)
-        {
-            if (!message.payload.isDigestQuery())
-                return true;
-        }
-        return false;
-    }
-}


Mime
View raw message