cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1071380 - in /cassandra/trunk: ./ contrib/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/...
Date Wed, 16 Feb 2011 20:12:39 GMT
Author: jbellis
Date: Wed Feb 16 20:12:37 2011
New Revision: 1071380

URL: http://svn.apache.org/viewvc?rev=1071380&view=rev
Log:
merge from 0.7

Added:
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/ReadResponseResolverTest.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7:1026516-1070530
+/cassandra/branches/cassandra-0.7:1026516-1070530,1070977
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Feb 16 20:12:37 2011
@@ -14,6 +14,7 @@
  * copy DecoratedKey.key when inserting into caches to avoid retaining
    a reference to the underlying buffer (CASSANDRA-2102)
  * format subcolumn names with subcomparator (CASSANDRA-2136)
+ * lower-latency read repair (CASSANDRA-2069)
 
 
 0.7.1

Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
-/cassandra/branches/cassandra-0.7/contrib:1026516-1070530
+/cassandra/branches/cassandra-0.7/contrib:1026516-1070530,1070977
 /cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/contrib:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1070530,1070977
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1070530,1070977
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1070530,1070977
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1070530,1070977
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Feb 16 20:12:37 2011
@@ -1,5 +1,5 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1068009,1068978
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1070530
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1070530,1070977
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/Stage.java Wed Feb 16 20:12:37 2011
@@ -32,6 +32,7 @@ public enum Stage
     MIGRATION,
     MISC,
     INTERNAL_RESPONSE,
+    READ_REPAIR,
     REPLICATE_ON_WRITE;
 
     public String getJmxType()
@@ -49,6 +50,7 @@ public enum Stage
             case READ:
             case REQUEST_RESPONSE:
             case REPLICATE_ON_WRITE:
+            case READ_REPAIR:
                 return "request";
             default:
                 throw new AssertionError("Unknown stage " + this);

Modified: cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java Wed Feb 16 20:12:37 2011
@@ -50,6 +50,7 @@ public class StageManager
         stages.put(Stage.ANTI_ENTROPY, new JMXEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
         stages.put(Stage.MIGRATION, new JMXEnabledThreadPoolExecutor(Stage.MIGRATION));
         stages.put(Stage.MISC, new JMXEnabledThreadPoolExecutor(Stage.MISC));
+        stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, Runtime.getRuntime().availableProcessors()));
     }
 
     private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RangeSliceCommand.java Wed Feb 16 20:12:37 2011
@@ -48,6 +48,7 @@ import org.apache.cassandra.io.ICompactS
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.ColumnParent;
 import org.apache.cassandra.thrift.SlicePredicate;
@@ -57,7 +58,7 @@ import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TSerializer;
 import org.apache.cassandra.thrift.TBinaryProtocol;
 
-public class RangeSliceCommand implements MessageProducer
+public class RangeSliceCommand implements MessageProducer, IReadCommand
 {
     private static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer();
     
@@ -114,6 +115,11 @@ public class RangeSliceCommand implement
         ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
         return serializer.deserialize(new DataInputStream(bis), message.getVersion());
     }
+
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
 }
 
 class RangeSliceCommandSerializer implements ICompactSerializer<RangeSliceCommand>

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Wed Feb 16 20:12:37 2011
@@ -31,11 +31,12 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageProducer;
+import org.apache.cassandra.service.IReadCommand;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 
-public abstract class ReadCommand implements MessageProducer
+public abstract class ReadCommand implements MessageProducer, IReadCommand
 {
     public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
     public static final byte CMD_TYPE_GET_SLICE = 2;
@@ -92,6 +93,11 @@ public abstract class ReadCommand implem
     {
         return ColumnFamily.getComparatorFor(table, getColumnFamilyName(), queryPath.superColumnName);
     }
+
+    public String getKeyspace()
+    {
+        return table;
+    }
 }
 
 class ReadCommandSerializer implements ICompactSerializer<ReadCommand>

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/BootStrapper.java Wed Feb 16 20:12:37 2011
@@ -287,5 +287,10 @@ public class BootStrapper
             token = StorageService.getPartitioner().getTokenFactory().fromString(new String(msg.getMessageBody(), Charsets.UTF_8));
             condition.signalAll();
         }
+
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Wed Feb 16 20:12:37 2011
@@ -96,6 +96,11 @@ class AsyncResult implements IAsyncResul
         }        
     }
 
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
+
     public InetAddress getFrom()
     {
         return from;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IMessageCallback.java Wed Feb 16 20:12:37 2011
@@ -23,4 +23,9 @@ package org.apache.cassandra.net;
 
 public interface IMessageCallback
 {
+    /**
+     * @return true if this callback is on the read path and its latency should be
+     * given as input to the dynamic snitch.
+     */
+    public boolean isLatencyForSnitch();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Feb 16 20:12:37 2011
@@ -143,7 +143,7 @@ public final class MessagingService impl
      */
     public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
     {
-        if (cb instanceof ReadCallback || cb instanceof AsyncResult)
+        if (cb.isLatencyForSnitch())
             addLatency(address, latency);
     }
 

Added: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractRowResolver.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,70 @@
+package org.apache.cassandra.service;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.FBUtilities;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+
+public abstract class AbstractRowResolver implements IResponseResolver<Row>
+{
+    protected static Logger logger = LoggerFactory.getLogger(AbstractRowResolver.class);
+
+    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY, -1);
+
+    protected final String table;
+    protected final ConcurrentMap<Message, ReadResponse> replies = new NonBlockingHashMap<Message, ReadResponse>();
+    protected final DecoratedKey key;
+
+    public AbstractRowResolver(ByteBuffer key, String table)
+    {
+        this.key = StorageService.getPartitioner().decorateKey(key);
+        this.table = table;
+    }
+
+    public void preprocess(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
+        {
+            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn), message.getVersion());
+            if (logger.isDebugEnabled())
+                logger.debug("Preprocessed {} response", result.isDigestQuery() ? "digest" : "data");
+            replies.put(message, result);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    /** hack so local reads don't force de/serialization of an extra real Message */
+    public void injectPreProcessed(ReadResponse result)
+    {
+        assert replies.get(FAKE_MESSAGE) == null; // should only be one local reply
+        replies.put(FAKE_MESSAGE, result);
+    }
+
+    public Iterable<Message> getMessages()
+    {
+        return replies.keySet();
+    }
+
+    public int getMessageCount()
+    {
+        return replies.size();
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AsyncRepairCallback.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,41 @@
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.utils.WrappedRunnable;
+
+public class AsyncRepairCallback implements IAsyncCallback
+{
+    private final RowRepairResolver repairResolver;
+    private final int count;
+
+    public AsyncRepairCallback(RowRepairResolver repairResolver, int count)
+    {
+        this.repairResolver = repairResolver;
+        this.count = count;
+    }
+
+    public void response(Message message)
+    {
+        repairResolver.preprocess(message);
+        if (repairResolver.getMessageCount() == count)
+        {
+            StageManager.getStage(Stage.READ_REPAIR).execute(new WrappedRunnable()
+            {
+                protected void runMayThrow() throws DigestMismatchException, IOException
+                {
+                    repairResolver.resolve();
+                }
+            });
+        }
+    }
+
+    public boolean isLatencyForSnitch()
+    {
+        return true;
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterReadCallback.java Wed Feb 16 20:12:37 2011
@@ -22,12 +22,12 @@ package org.apache.cassandra.service;
 
 
 import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Table;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
@@ -44,12 +44,12 @@ public class DatacenterReadCallback<T> e
     private static final String localdc = snitch.getDatacenter(FBUtilities.getLocalAddress());
     private AtomicInteger localResponses;
     
-    public DatacenterReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+    public DatacenterReadCallback(IResponseResolver resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
-        super(resolver, consistencyLevel, table);
+        super(resolver, consistencyLevel, command, endpoints);
         localResponses = new AtomicInteger(blockfor);
     }
-    
+
     @Override
     public void response(Message message)
     {
@@ -68,14 +68,15 @@ public class DatacenterReadCallback<T> e
     @Override
     public void response(ReadResponse result)
     {
-        ((ReadResponseResolver) resolver).injectPreProcessed(result);
+        ((RowDigestResolver) resolver).injectPreProcessed(result);
 
         int n = localResponses.decrementAndGet();
-
         if (n == 0 && resolver.isDataPresent())
         {
             condition.signal();
         }
+
+        maybeResolveForRepair();
     }
     
     @Override
@@ -86,7 +87,7 @@ public class DatacenterReadCallback<T> e
 	}
 
     @Override
-    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+    public void assureSufficientLiveNodes() throws UnavailableException
     {
         int localEndpoints = 0;
         for (InetAddress endpoint : endpoints)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Wed Feb 16 20:12:37 2011
@@ -115,4 +115,9 @@ public class DatacenterSyncWriteResponse
                 throw new UnavailableException();
         }
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IReadCommand.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,6 @@
+package org.apache.cassandra.service;
+
+public interface IReadCommand
+{
+    public String getKeyspace();
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Wed Feb 16 20:12:37 2011
@@ -49,7 +49,6 @@ public class RangeSliceResponseResolver 
 
     public RangeSliceResponseResolver(String table, List<InetAddress> sources)
     {
-        assert sources.size() > 0;
         this.sources = sources;
         this.table = table;
     }
@@ -103,8 +102,8 @@ public class RangeSliceResponseResolver 
 
             protected Row getReduced()
             {
-                ColumnFamily resolved = ReadResponseResolver.resolveSuperset(versions);
-                ReadResponseResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
+                ColumnFamily resolved = RowRepairResolver.resolveSuperset(versions);
+                RowRepairResolver.maybeScheduleRepairs(resolved, table, key, versions, versionSources);
                 versions.clear();
                 versionSources.clear();
                 return new Row(key, resolved);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Wed Feb 16 20:12:37 2011
@@ -20,14 +20,20 @@ package org.apache.cassandra.service;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.Collection;
+import java.util.List;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -36,28 +42,61 @@ import org.apache.cassandra.net.Messagin
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
 import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class ReadCallback<T> implements IAsyncCallback
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
+    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
+    {
+        @Override
+        protected Random initialValue()
+        {
+            return new Random();
+        }
+    };
+
     public final IResponseResolver<T> resolver;
     protected final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
     protected final int blockfor;
-    
+    final List<InetAddress> endpoints;
+    private final IReadCommand command;
+
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, String table)
+    public ReadCallback(IResponseResolver<T> resolver, ConsistencyLevel consistencyLevel, IReadCommand command, List<InetAddress> endpoints)
     {
-        this.blockfor = determineBlockFor(consistencyLevel, table);
+        this.command = command;
+        this.blockfor = determineBlockFor(consistencyLevel, command.getKeyspace());
         this.resolver = resolver;
         this.startTime = System.currentTimeMillis();
-
-        logger.debug("ReadCallback blocking for {} responses", blockfor);
+        boolean repair = randomlyReadRepair();
+        this.endpoints = repair || resolver instanceof RowRepairResolver
+                       ? endpoints
+                       : endpoints.subList(0, Math.min(endpoints.size(), blockfor)); // min so as to not throw exception until assureSufficient is called
+
+        if (logger.isDebugEnabled())
+            logger.debug(String.format("Blockfor/repair is %s/%s; setting up requests to %s",
+                                       blockfor, repair, StringUtils.join(this.endpoints, ",")));
     }
     
+    private boolean randomlyReadRepair()
+    {
+        if (resolver instanceof RowDigestResolver)
+        {
+            assert command instanceof ReadCommand : command;
+            String table = ((RowDigestResolver) resolver).table;
+            String columnFamily = ((ReadCommand) command).getColumnFamilyName();
+            CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(table).get(columnFamily);
+            return cfmd.getReadRepairChance() > random.get().nextDouble();
+        }
+        // we don't read repair on range scans
+        return false;
+    }
+
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -85,21 +124,42 @@ public class ReadCallback<T> implements 
     public void response(Message message)
     {
         resolver.preprocess(message);
+        assert resolver.getMessageCount() <= endpoints.size();
         if (resolver.getMessageCount() < blockfor)
             return;
         if (resolver.isDataPresent())
+        {
             condition.signal();
+            maybeResolveForRepair();
+        }
     }
 
     public void response(ReadResponse result)
     {
-        ((ReadResponseResolver) resolver).injectPreProcessed(result);
+        ((RowDigestResolver) resolver).injectPreProcessed(result);
+        assert resolver.getMessageCount() <= endpoints.size();
         if (resolver.getMessageCount() < blockfor)
             return;
         if (resolver.isDataPresent())
+        {
             condition.signal();
+            maybeResolveForRepair();
+        }
     }
-    
+
+    /**
+     * Check digests in the background on the Repair stage if we've received replies
+     * too all the requests we sent.
+     */
+    protected void maybeResolveForRepair()
+    {
+        if (blockfor < endpoints.size() && resolver.getMessageCount() == endpoints.size())
+        {
+            assert resolver.isDataPresent();
+            StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
+        }
+    }
+
     public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
     {
         switch (consistencyLevel)
@@ -116,9 +176,37 @@ public class ReadCallback<T> implements 
         }
     }
 
-    public void assureSufficientLiveNodes(Collection<InetAddress> endpoints) throws UnavailableException
+    public void assureSufficientLiveNodes() throws UnavailableException
     {
         if (endpoints.size() < blockfor)
             throw new UnavailableException();
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return true;
+    }
+
+    private class AsyncRepairRunner extends WrappedRunnable
+    {
+        protected void runMayThrow() throws IOException
+        {
+            try
+            {
+                resolver.resolve();
+            }
+            catch (DigestMismatchException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Digest mismatch:", e);
+
+                ReadCommand readCommand = (ReadCommand) command;
+                final RowRepairResolver repairResolver = new RowRepairResolver(readCommand.table, readCommand.key);
+                IAsyncCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
+
+                for (InetAddress endpoint : endpoints)
+                    MessagingService.instance().sendRR(readCommand, endpoint, repairHandler);
+            }
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RepairCallback.java Wed Feb 16 20:12:37 2011
@@ -39,6 +39,14 @@ public class RepairCallback<T> implement
     private final SimpleCondition condition = new SimpleCondition();
     private final long startTime;
 
+    /**
+     * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
+     * it needs to achieve.  Repair on the other hand is happy to repair whoever replies within the timeout.
+     *
+     * (The other main difference of course is, this is only created once we know we have a digest
+     * mismatch, and we're going to do full-data reads from everyone -- that is, this is the final
+     * stage in the read process.)
+     */
     public RepairCallback(IResponseResolver<T> resolver, List<InetAddress> endpoints)
     {
         this.resolver = resolver;
@@ -46,10 +54,6 @@ public class RepairCallback<T> implement
         this.startTime = System.currentTimeMillis();
     }
 
-    /**
-     * The main difference between this and ReadCallback is, ReadCallback has a ConsistencyLevel
-     * it needs to achieve.  Repair on the other hand is happy to repair whoever replies within the timeout.
-     */
     public T get() throws TimeoutException, DigestMismatchException, IOException
     {
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
@@ -71,4 +75,9 @@ public class RepairCallback<T> implement
         if (resolver.getMessageCount() == endpoints.size())
             condition.signal();
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return true;
+    }
 }

Added: cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowDigestResolver.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ReadResponse;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.net.Message;
+
+public class RowDigestResolver extends AbstractRowResolver
+{
+    public RowDigestResolver(String table, ByteBuffer key)
+    {
+        super(key, table);
+    }
+    
+    public Row getData() throws IOException
+    {
+        for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+        {
+            ReadResponse result = entry.getValue();
+            if (!result.isDigestQuery())
+                return result.row();
+        }
+
+        throw new AssertionError("getData should not be invoked when no data is present");
+    }
+
+    /*
+     * This method handles two different scenarios:
+     *
+     * 1a)we're handling the initial read, of data from the closest replica + digests
+     *    from the rest.  In this case we check the digests against each other,
+     *    throw an exception if there is a mismatch, otherwise return the data row.
+     *
+     * 1b)we're checking additional digests that arrived after the minimum to handle
+     *    the requested ConsistencyLevel, i.e. asynchronouse read repair check
+     */
+    public Row resolve() throws DigestMismatchException, IOException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("resolving " + replies.size() + " responses");
+
+        long startTime = System.currentTimeMillis();
+		ColumnFamily data = null;
+
+        // case 1: validate digests against each other; throw immediately on mismatch.
+        // also, collects data results into versions/endpoints lists.
+        //
+        // results are cleared as we process them, to avoid unnecessary duplication of work
+        // when resolve() is called a second time for read repair on responses that were not
+        // necessary to satisfy ConsistencyLevel.
+        ByteBuffer digest = null;
+        for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+        {
+            ReadResponse response = entry.getValue();
+            if (response.isDigestQuery())
+            {
+                if (digest == null)
+                {
+                    digest = response.digest();
+                }
+                else
+                {
+                    ByteBuffer digest2 = response.digest();
+                    if (!digest.equals(digest2))
+                        throw new DigestMismatchException(key, digest, digest2);
+                }
+            }
+            else
+            {
+                data = response.row().cf;
+            }
+        }
+
+		// If there was a digest query compare it with all the data digests
+		// If there is a mismatch then throw an exception so that read repair can happen.
+        //
+        // It's important to note that we do not compare the digests of multiple data responses --
+        // if we are in that situation we know there was a previous mismatch and now we're doing a repair,
+        // so our job is now case 2: figure out what the most recent version is and update everyone to that version.
+        if (digest != null)
+        {
+            ByteBuffer digest2 = ColumnFamily.digest(data);
+            if (!digest.equals(digest2))
+                throw new DigestMismatchException(key, digest, digest2);
+            if (logger.isDebugEnabled())
+                logger.debug("digests verified");
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+		return new Row(key, data);
+	}
+
+    public boolean isDataPresent()
+	{
+        for (ReadResponse result : replies.values())
+        {
+            if (!result.isDigestQuery())
+                return true;
+        }
+        return false;
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+
+public class RowRepairResolver extends AbstractRowResolver
+{
+    public RowRepairResolver(String table, ByteBuffer key)
+    {
+        super(key, table);
+    }
+
+    /*
+    * This method handles the following scenario:
+    *
+    * there was a mismatch on the initial read (1a or 1b), so we redid the digest requests
+    * as full data reads.  In this case we need to compute the most recent version
+    * of each column, and send diffs to out-of-date replicas.
+    */
+    public Row resolve() throws DigestMismatchException, IOException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("resolving " + replies.size() + " responses");
+
+        long startTime = System.currentTimeMillis();
+		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
+		List<InetAddress> endpoints = new ArrayList<InetAddress>();
+
+        // case 1: validate digests against each other; throw immediately on mismatch.
+        // also, collects data results into versions/endpoints lists.
+        //
+        // results are cleared as we process them, to avoid unnecessary duplication of work
+        // when resolve() is called a second time for read repair on responses that were not
+        // necessary to satisfy ConsistencyLevel.
+        for (Map.Entry<Message, ReadResponse> entry : replies.entrySet())
+        {
+            Message message = entry.getKey();
+            ReadResponse response = entry.getValue();
+            assert !response.isDigestQuery();
+            versions.add(response.row().cf);
+            endpoints.add(message.getFrom());
+        }
+
+        ColumnFamily resolved;
+        if (versions.size() > 1)
+        {
+            resolved = resolveSuperset(versions);
+            if (logger.isDebugEnabled())
+                logger.debug("versions merged");
+            maybeScheduleRepairs(resolved, table, key, versions, endpoints);
+        }
+        else
+        {
+            resolved = versions.get(0);
+        }
+
+        if (logger.isDebugEnabled())
+            logger.debug("resolve: " + (System.currentTimeMillis() - startTime) + " ms.");
+		return new Row(key, resolved);
+	}
+
+    /**
+     * For each row version, compare with resolved (the superset of all row versions);
+     * if it is missing anything, send a mutation to the endpoint it come from.
+     */
+    public static void maybeScheduleRepairs(ColumnFamily resolved, String table, DecoratedKey key, List<ColumnFamily> versions, List<InetAddress> endpoints)
+    {
+        for (int i = 0; i < versions.size(); i++)
+        {
+            ColumnFamily diffCf = ColumnFamily.diff(versions.get(i), resolved);
+            if (diffCf == null) // no repair needs to happen
+                continue;
+
+            // create and send the row mutation message based on the diff
+            RowMutation rowMutation = new RowMutation(table, key.key);
+            rowMutation.add(diffCf);
+            Message repairMessage;
+            try
+            {
+                repairMessage = rowMutation.getMessage(Gossiper.instance.getVersion(endpoints.get(i)));
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            MessagingService.instance().sendOneWay(repairMessage, endpoints.get(i));
+        }
+    }
+
+    static ColumnFamily resolveSuperset(List<ColumnFamily> versions)
+    {
+        assert versions.size() > 0;
+
+        ColumnFamily resolved = null;
+        for (ColumnFamily cf : versions)
+        {
+            if (cf != null)
+            {
+                resolved = cf.cloneMe();
+                break;
+            }
+        }
+        if (resolved == null)
+            return null;
+
+        for (ColumnFamily cf : versions)
+            resolved.resolve(cf);
+
+        return resolved;
+    }
+
+    public Row getData() throws IOException
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isDataPresent()
+	{
+        throw new UnsupportedOperationException();
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Feb 16 20:12:37 2011
@@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -66,17 +65,6 @@ public class StorageProxy implements Sto
 {
     private static final Logger logger = LoggerFactory.getLogger(StorageProxy.class);
 
-    private static ScheduledExecutorService repairExecutor = new ScheduledThreadPoolExecutor(1); // TODO JMX-enable this
-
-    private static final ThreadLocal<Random> random = new ThreadLocal<Random>()
-    {
-        @Override
-        protected Random initialValue()
-        {
-            return new Random();
-        }
-    };
-
     // mbean stuff
     private static final LatencyTracker readStats = new LatencyTracker();
     private static final LatencyTracker rangeStats = new LatencyTracker();
@@ -91,7 +79,10 @@ public class StorageProxy implements Sto
     private static final WritePerformer standardWritePerformer;
     private static final WritePerformer counterWritePerformer;
 
+    public static final StorageProxy instance = new StorageProxy();
+
     private StorageProxy() {}
+
     static
     {
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -208,7 +199,7 @@ public class StorageProxy implements Sto
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
         Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
-        MessageProducer prod = new CachingMessageProducer(rm);
+        MessageProducer producer = new CachingMessageProducer(rm);
 
         for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
         {
@@ -238,7 +229,7 @@ public class StorageProxy implements Sto
                        dcMessages.put(dc, messages);
                     }
 
-                    messages.put(prod.getMessage(Gossiper.instance.getVersion(destination)), destination);
+                    messages.put(producer.getMessage(Gossiper.instance.getVersion(destination)), destination);
                 }
             }
             else
@@ -506,109 +497,97 @@ public class StorageProxy implements Sto
     private static List<Row> fetchRows(List<ReadCommand> commands, ConsistencyLevel consistency_level) throws IOException, UnavailableException, TimeoutException
     {
         List<ReadCallback<Row>> readCallbacks = new ArrayList<ReadCallback<Row>>();
-        List<List<InetAddress>> commandEndpoints = new ArrayList<List<InetAddress>>();
         List<Row> rows = new ArrayList<Row>();
-        Set<ReadCommand> repairs = new HashSet<ReadCommand>();
 
         // send out read requests
         for (ReadCommand command: commands)
         {
             assert !command.isDigestQuery();
+            logger.debug("Command/ConsistencyLevel is {}/{}", command, consistency_level);
 
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table, command.key);
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), endpoints);
 
-            ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
-            ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
-            handler.assureSufficientLiveNodes(endpoints);
-
-            // if we're not going to read repair, cut the endpoints list down to the ones required to satisfy ConsistencyLevel
-            if (randomlyReadRepair(command))
-            {
-                if (endpoints.size() > handler.blockfor)
-                    repairs.add(command);
-            }
-            else
-            {
-                endpoints = endpoints.subList(0, handler.blockfor);
-            }
+            RowDigestResolver resolver = new RowDigestResolver(command.table, command.key);
+            ReadCallback<Row> handler = getReadCallback(resolver, command, consistency_level, endpoints);
+            handler.assureSufficientLiveNodes();
+            assert !handler.endpoints.isEmpty();
 
             // The data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
             ReadCommand digestCommand = null;
-            if (endpoints.size() > 1)
+            if (handler.endpoints.size() > 1)
             {
                 digestCommand = command.copy();
                 digestCommand.setDigestQuery(true);
             }
 
-            InetAddress dataPoint = endpoints.get(0);
+            InetAddress dataPoint = handler.endpoints.get(0);
             if (dataPoint.equals(FBUtilities.getLocalAddress()))
             {
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data for " + command + " locally");
+                    logger.debug("reading data locally");
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
             }
             else
             {
-                Message message = command.getMessage(Gossiper.instance.getVersion(dataPoint));
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data for " + command + " from " + dataPoint);
-                MessagingService.instance().sendRR(message, dataPoint, handler);
+                    logger.debug("reading data from " + dataPoint);
+                MessagingService.instance().sendRR(command, dataPoint, handler);
             }
 
             // We lazy-construct the digest Message object since it may not be necessary if we
             // are doing a local digest read, or no digest reads at all.
-            MessageProducer prod = new CachingMessageProducer(digestCommand);
-            for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
+            MessageProducer producer = new CachingMessageProducer(digestCommand);
+            for (InetAddress digestPoint : handler.endpoints.subList(1, handler.endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
                 {
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest for " + command + " locally");
+                        logger.debug("reading digest locally");
                     StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
                 }
                 else
                 {
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest for " + command + " from " + digestPoint);
-                    MessagingService.instance().sendRR(prod, digestPoint, handler);
+                        logger.debug("reading digest for from " + digestPoint);
+                    MessagingService.instance().sendRR(producer, digestPoint, handler);
                 }
             }
 
             readCallbacks.add(handler);
-            commandEndpoints.add(endpoints);
         }
 
         // read results and make a second pass for any digest mismatches
         List<RepairCallback<Row>> repairResponseHandlers = null;
         for (int i = 0; i < commands.size(); i++)
         {
-            ReadCallback<Row> readCallback = readCallbacks.get(i);
+            ReadCallback<Row> handler = readCallbacks.get(i);
             Row row;
             ReadCommand command = commands.get(i);
-            List<InetAddress> endpoints = commandEndpoints.get(i);
             try
             {
                 long startTime2 = System.currentTimeMillis();
-                row = readCallback.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
+                row = handler.get(); // CL.ONE is special cased here to ignore digests even if some have arrived
                 if (row != null)
                     rows.add(row);
 
                 if (logger.isDebugEnabled())
                     logger.debug("Read: " + (System.currentTimeMillis() - startTime2) + " ms.");
-
-                if (repairs.contains(command))
-                    repairExecutor.schedule(new RepairRunner(readCallback.resolver, command, endpoints), DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             }
             catch (DigestMismatchException ex)
             {
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
-                RepairCallback<Row> handler = repair(command, endpoints);
+
+                RowRepairResolver resolver = new RowRepairResolver(command.table, command.key);
+                RepairCallback<Row> repairHandler = new RepairCallback<Row>(resolver, handler.endpoints);
+                for (InetAddress endpoint : handler.endpoints)
+                    MessagingService.instance().sendRR(command, endpoint, repairHandler);
+
                 if (repairResponseHandlers == null)
                     repairResponseHandlers = new ArrayList<RepairCallback<Row>>();
-                repairResponseHandlers.add(handler);
+                repairResponseHandlers.add(repairHandler);
             }
         }
 
@@ -657,24 +636,13 @@ public class StorageProxy implements Sto
         }
     }
 
-    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
+    static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
     {
         if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
         {
-            return new DatacenterReadCallback(resolver, consistencyLevel, table);
+            return new DatacenterReadCallback(resolver, consistencyLevel, command, endpoints);
         }
-        return new ReadCallback(resolver, consistencyLevel, table);
-    }
-
-    private static RepairCallback<Row> repair(ReadCommand command, List<InetAddress> endpoints)
-    throws IOException
-    {
-        ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
-        RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
-        MessageProducer prod = new CachingMessageProducer(command);
-        for (InetAddress endpoint : endpoints)
-            MessagingService.instance().sendRR(prod, endpoint, handler);
-        return handler;
+        return new ReadCallback(resolver, consistencyLevel, command, endpoints);
     }
 
     /*
@@ -725,16 +693,14 @@ public class StorageProxy implements Sto
 
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
-                    ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
-                    MessageProducer prod = new CachingMessageProducer(c2);
-                    // TODO bail early if live endpoints can't satisfy requested consistency level
+                    ReadCallback<List<Row>> handler = getReadCallback(resolver, command, consistency_level, liveEndpoints);
+                    handler.assureSufficientLiveNodes();
                     for (InetAddress endpoint : liveEndpoints)
                     {
-                        MessagingService.instance().sendRR(prod, endpoint, handler);
+                        MessagingService.instance().sendRR(c2, endpoint, handler);
                         if (logger.isDebugEnabled())
                             logger.debug("reading " + c2 + " from " + endpoint);
                     }
-                    // TODO read repair on remaining replicas?
 
                     // if we're done, great, otherwise, move to the next range
                     try
@@ -787,6 +753,11 @@ public class StorageProxy implements Sto
                 versions.put(message.getFrom(), theirVersion);
                 latch.countDown();
             }
+
+            public boolean isLatencyForSnitch()
+            {
+                return false;
+            }
         };
         // an empty message acts as a request to the SchemaCheckVerbHandler.
         for (InetAddress endpoint : liveHosts)
@@ -881,12 +852,6 @@ public class StorageProxy implements Sto
         return ranges;
     }
 
-    private static boolean randomlyReadRepair(ReadCommand command)
-    {
-        CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
-        return cfmd.getReadRepairChance() > random.get().nextDouble();
-    }
-
     public long getReadOperations()
     {
         return readStats.getOpCount();
@@ -987,7 +952,7 @@ public class StorageProxy implements Sto
         return counterWriteStats.getRecentLatencyHistogramMicros();
     }
 
-    public static List<Row> scan(String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
+    public static List<Row> scan(final String keyspace, String column_family, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws IOException, TimeoutException, UnavailableException
     {
         IPartitioner p = StorageService.getPartitioner();
@@ -1005,17 +970,21 @@ public class StorageProxy implements Sto
 
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
-            ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level);
-
-            // bail early if live endpoints can't satisfy requested consistency level
-            if(handler.blockfor > liveEndpoints.size())
-                throw new UnavailableException();
+            IReadCommand iCommand = new IReadCommand()
+            {
+                public String getKeyspace()
+                {
+                    return keyspace;
+                }
+            };
+            ReadCallback<List<Row>> handler = getReadCallback(resolver, iCommand, consistency_level, liveEndpoints);
+            handler.assureSufficientLiveNodes();
 
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
-            MessageProducer prod = new CachingMessageProducer(command);
+            MessageProducer producer = new CachingMessageProducer(command);
             for (InetAddress endpoint : liveEndpoints)
             {
-                MessagingService.instance().sendRR(prod, endpoint, handler);
+                MessagingService.instance().sendRR(producer, endpoint, handler);
                 if (logger.isDebugEnabled())
                     logger.debug("reading " + command + " from " + endpoint);
             }
@@ -1100,11 +1069,9 @@ public class StorageProxy implements Sto
         // Send out the truncate calls and track the responses with the callbacks.
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
         final Truncation truncation = new Truncation(keyspace, cfname);
-        MessageProducer prod = new CachingMessageProducer(truncation);
+        MessageProducer producer = new CachingMessageProducer(truncation);
         for (InetAddress endpoint : allEndpoints)
-        {
-            MessagingService.instance().sendRR(prod, endpoint, responseHandler);
-        }
+            MessagingService.instance().sendRR(producer, endpoint, responseHandler);
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);
@@ -1121,42 +1088,6 @@ public class StorageProxy implements Sto
         return !Gossiper.instance.getUnreachableMembers().isEmpty();
     }
 
-    private static class RepairRunner extends WrappedRunnable
-    {
-        private final IResponseResolver<Row> resolver;
-        private final ReadCommand command;
-        private final List<InetAddress> endpoints;
-
-        public RepairRunner(IResponseResolver<Row> resolver, ReadCommand command, List<InetAddress> endpoints)
-        {
-            this.resolver = resolver;
-            this.command = command;
-            this.endpoints = endpoints;
-        }
-
-        protected void runMayThrow() throws IOException
-        {
-            try
-            {
-                resolver.resolve();
-            }
-            catch (DigestMismatchException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Digest mismatch:", e);
-                final RepairCallback<Row> callback = repair(command, endpoints);
-                Runnable runnable = new WrappedRunnable()
-                {
-                    public void runMayThrow() throws DigestMismatchException, IOException, TimeoutException
-                    {
-                        callback.get();
-                    }
-                };
-                repairExecutor.schedule(runnable, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
     private interface WritePerformer
     {
         public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException;

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/TruncateResponseHandler.java Wed Feb 16 20:12:37 2011
@@ -73,4 +73,9 @@ public class TruncateResponseHandler imp
         if (responses.get() >= responseCount)
             condition.signal();
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Wed Feb 16 20:12:37 2011
@@ -121,4 +121,9 @@ public class WriteResponseHandler extend
             throw new UnavailableException();
         }
     }
+
+    public boolean isLatencyForSnitch()
+    {
+        return false;
+    }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1071380&r1=1071379&r2=1071380&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Wed Feb 16 20:12:37 2011
@@ -71,7 +71,7 @@ public class ConsistencyLevelTest extend
 
         AbstractReplicationStrategy strategy;
 
-        for (String table : DatabaseDescriptor.getNonSystemTables())
+        for (final String table : DatabaseDescriptor.getNonSystemTables())
         {
             strategy = getStrategy(table, tmd);
             StorageService.calculatePendingRanges(strategy, table);
@@ -96,7 +96,15 @@ public class ConsistencyLevelTest extend
 
                     IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c);
 
-                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(new ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), table, c);
+                    IReadCommand command = new IReadCommand()
+                    {
+                        public String getKeyspace()
+                        {
+                            return table;
+                        }
+                    };
+                    RowRepairResolver resolver = new RowRepairResolver(table, ByteBufferUtil.bytes("foo"));
+                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, new ArrayList<InetAddress>(hintedNodes.keySet()));
 
                     boolean isWriteUnavailable = false;
                     boolean isReadUnavailable = false;
@@ -111,7 +119,7 @@ public class ConsistencyLevelTest extend
 
                     try
                     {
-                        readHandler.assureSufficientLiveNodes(hintedNodes.asMap().keySet());
+                        readHandler.assureSufficientLiveNodes();
                     }
                     catch (UnavailableException e)
                     {

Added: cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java?rev=1071380&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RowResolverTest.java Wed Feb 16 20:12:37 2011
@@ -0,0 +1,96 @@
+package org.apache.cassandra.service;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
+
+import java.util.Arrays;
+
+import org.apache.cassandra.SchemaLoader;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamily;
+
+import static org.apache.cassandra.db.TableTest.assertColumns;
+import static org.apache.cassandra.Util.column;
+import static junit.framework.Assert.assertNull;
+
+public class RowResolverTest extends SchemaLoader
+{
+    @Test
+    public void testResolveSupersetNewer()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c1", "v2", 1));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c1");
+        assertNull(ColumnFamily.diff(cf2, resolved));
+    }
+
+    @Test
+    public void testResolveSupersetDisjoint()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c2", "v2", 1));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, cf2));
+        assertColumns(resolved, "c1", "c2");
+        assertColumns(ColumnFamily.diff(cf1, resolved), "c2");
+        assertColumns(ColumnFamily.diff(cf2, resolved), "c1");
+    }
+
+    @Test
+    public void testResolveSupersetNullOne()
+    {
+        ColumnFamily cf2 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf2.addColumn(column("c2", "v2", 1));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(null, cf2));
+        assertColumns(resolved, "c2");
+        assertColumns(ColumnFamily.diff(null, resolved), "c2");
+        assertNull(ColumnFamily.diff(cf2, resolved));
+    }
+
+    @Test
+    public void testResolveSupersetNullTwo()
+    {
+        ColumnFamily cf1 = ColumnFamily.create("Keyspace1", "Standard1");
+        cf1.addColumn(column("c1", "v1", 0));
+
+        ColumnFamily resolved = RowRepairResolver.resolveSuperset(Arrays.asList(cf1, null));
+        assertColumns(resolved, "c1");
+        assertNull(ColumnFamily.diff(cf1, resolved));
+        assertColumns(ColumnFamily.diff(null, resolved), "c1");
+    }
+
+    @Test
+    public void testResolveSupersetNullBoth()
+    {
+        assertNull(RowRepairResolver.resolveSuperset(Arrays.<ColumnFamily>asList(null, null)));
+    }
+}



Mime
View raw message