cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1051667 - in /cassandra/trunk: ./ contrib/py_stress/ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ test/unit/org/apache/ca...
Date Tue, 21 Dec 2010 21:45:56 GMT
Author: jbellis
Date: Tue Dec 21 21:45:56 2010
New Revision: 1051667

URL: http://svn.apache.org/viewvc?rev=1051667&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/contrib/py_stress/stress.py
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
    cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1050987
-/cassandra/branches/cassandra-0.7:1026517-1050989
+/cassandra/branches/cassandra-0.6:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7:1026517-1051666
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Dec 21 21:45:56 2010
@@ -23,6 +23,8 @@
  * flush before repair (CASSANDRA-1748)
  * SSTableExport validates key order (CASSANDRA-1884)
  * Re-cache hot keys post-compaction without hitting disk (CASSANDRA-1878)
+ * manage read repair in coordinator instead of data source, to
+   provide latency information to dynamic snitch (CASSANDRA-1873)
 
 
 0.7.0-rc2

Modified: cassandra/trunk/contrib/py_stress/stress.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/contrib/py_stress/stress.py?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/contrib/py_stress/stress.py (original)
+++ cassandra/trunk/contrib/py_stress/stress.py Tue Dec 21 21:45:56 2010
@@ -24,6 +24,7 @@ have_multiproc = False
 try:
     from multiprocessing import Array as array, Process as Thread
     from uuid import uuid1 as get_ident
+    array('i', 1) # catch "This platform lacks a functioning sem_open implementation"
     Thread.isAlive = Thread.is_alive
     have_multiproc = True
 except ImportError:

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026517-1051666
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026517-1051666
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026517-1051666
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026517-1051666
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Dec 21 21:45:56 2010
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1050987
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1050989
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1051640,1051662
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026517-1051666
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 /incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadCommand.java Tue Dec 21 21:45:56
2010
@@ -36,7 +36,6 @@ import org.apache.cassandra.utils.FBUtil
 
 public abstract class ReadCommand
 {
-    public static final String DO_REPAIR = "READ-REPAIR";
     public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
     public static final byte CMD_TYPE_GET_SLICE = 2;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadResponse.java Tue Dec 21 21:45:56
2010
@@ -59,6 +59,7 @@ private static ICompactSerializer<ReadRe
 
 	public ReadResponse(Row row)
     {
+        assert row != null;
 		row_ = row;
 	}
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ReadVerbHandler.java Tue Dec 21 21:45:56
2010
@@ -96,14 +96,6 @@ public class ReadVerbHandler implements 
               logger_.debug(String.format("Read key %s; sending response to %s@%s",
                                           FBUtilities.bytesToHex(command.key), message.getMessageId(),
message.getFrom()));
             MessagingService.instance.sendOneWay(response, message.getFrom());
-
-            /* Do read repair if header of the message says so */
-            if (message.getHeader(ReadCommand.DO_REPAIR) != null)
-            {
-                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
-                if (endpoints.size() > 1)
-                    StorageService.instance.doConsistencyCheck(row, endpoints, command);
-            }
         }
         catch (IOException ex)
         {

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Tue Dec 21 21:45:56
2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.net;
 
+import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,6 +37,7 @@ class AsyncResult implements IAsyncResul
     private Lock lock = new ReentrantLock();
     private Condition condition;
     private long startTime;
+    private InetAddress from;
 
     public AsyncResult()
     {        
@@ -74,14 +76,15 @@ class AsyncResult implements IAsyncResul
         }
         return result;
     }
-      
+
     public void result(Message response)
     {        
         try
         {
             lock.lock();
             if (!done.get())
-            {                
+            {
+                from = response.getFrom();
                 result = response.getMessageBody();
                 done.set(true);
                 condition.signal();
@@ -91,5 +94,10 @@ class AsyncResult implements IAsyncResul
         {
             lock.unlock();
         }        
-    }    
+    }
+
+    public InetAddress getFrom()
+    {
+        return from;
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Tue Dec 21 21:45:56
2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.net;
 
+import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -38,4 +39,6 @@ public interface IAsyncResult
      * @param result the response message
      */
     public void result(Message result);
+
+    public InetAddress getFrom();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ConsistencyChecker.java Tue Dec
21 21:45:56 2010
@@ -41,7 +41,6 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -55,7 +54,7 @@ import org.apache.cassandra.utils.Wrappe
  * (1) sends DIGEST read requests to each other replica of the given row.
  *
  * [DigestResponseHandler]
- * (2) If any of the digests to not match the local one, it sends a second round of requests
+ * (2) If any of the digests to not match the data read, it sends a second round of requests
  * to each replica, this time for the full data
  *
  * [DataRepairHandler]
@@ -68,18 +67,17 @@ class ConsistencyChecker implements Runn
 
     private static ScheduledExecutorService executor_ = new ScheduledThreadPoolExecutor(1);
// TODO add JMX
 
-    private final String table_;
     private final Row row_;
     protected final List<InetAddress> replicas_;
     private final ReadCommand readCommand_;
+    private final InetAddress dataSource;
 
-    public ConsistencyChecker(String table, Row row, List<InetAddress> endpoints, ReadCommand
readCommand)
+    public ConsistencyChecker(ReadCommand command, Row row, List<InetAddress> endpoints,
InetAddress dataSource)
     {
-        table_ = table;
         row_ = row;
         replicas_ = endpoints;
-        readCommand_ = readCommand;
-        assert replicas_.contains(FBUtilities.getLocalAddress());
+        readCommand_ = command;
+        this.dataSource = dataSource;
     }
 
     public void run()
@@ -94,7 +92,7 @@ class ConsistencyChecker implements Runn
             MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
             for (InetAddress endpoint : replicas_)
             {
-                if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                if (!endpoint.equals(dataSource))
                     MessagingService.instance.sendOneWay(message, endpoint);
             }
 		}
@@ -130,17 +128,14 @@ class ConsistencyChecker implements Runn
 
                 if (!localDigest.equals(digest))
                 {
-                    ReadResponseResolver readResponseResolver = new ReadResponseResolver(table_);
-                    IAsyncCallback responseHandler = new DataRepairHandler(row_, replicas_.size(),
readResponseResolver);
-
                     ReadCommand readCommand = constructReadMessage(false);
                     Message message = readCommand.makeReadMessage();
                     if (logger_.isDebugEnabled())
                         logger_.debug("Digest mismatch; re-reading " + readCommand_.key +
" from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");     
                   
-                    MessagingService.instance.addCallback(responseHandler, message.getMessageId());
+                    MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId());
                     for (InetAddress endpoint : replicas_)
                     {
-                        if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                        if (!endpoint.equals(dataSource))
                             MessagingService.instance.sendOneWay(message, endpoint);
                     }
 
@@ -154,18 +149,18 @@ class ConsistencyChecker implements Runn
         }
     }
 
-	static class DataRepairHandler implements IAsyncCallback
+    class DataRepairHandler implements IAsyncCallback
 	{
 		private final ReadResponseResolver readResponseResolver_;
 		private final int majority_;
 		
-        public DataRepairHandler(Row localRow, int responseCount, ReadResponseResolver readResponseResolver)
throws IOException
+        public DataRepairHandler() throws IOException
         {
-            readResponseResolver_ = readResponseResolver;
-            majority_ = (responseCount / 2) + 1;
-            // wrap localRow in a response Message so it doesn't need to be special-cased
in the resolver
-            ReadResponse readResponse = new ReadResponse(localRow);
-            Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);
+            readResponseResolver_ = new ReadResponseResolver(readCommand_.table, readCommand_.key);
+            majority_ = (replicas_.size() / 2) + 1;
+            // wrap original data Row in a response Message so it doesn't need to be special-cased
in the resolver
+            ReadResponse readResponse = new ReadResponse(row_);
+            Message fakeMessage = new Message(dataSource, StorageService.Verb.INTERNAL_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);
             readResponseResolver_.injectPreProcessed(fakeMessage, readResponse);
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/GCInspector.java Tue Dec 21 21:45:56
2010
@@ -42,12 +42,12 @@ import org.apache.cassandra.net.Messagin
 
 public class GCInspector
 {
-    public static final GCInspector instance = new GCInspector();
-
     private static final Logger logger = LoggerFactory.getLogger(GCInspector.class);
     final static long INTERVAL_IN_MS = 1000;
     final static long MIN_DURATION = 200;
     final static long MIN_DURATION_TPSTATS = 1000;
+    
+    public static final GCInspector instance = new GCInspector();
 
     private HashMap<String, Long> gctimes = new HashMap<String, Long>();
     private final MBeanServer server = ManagementFactory.getPlatformMBeanServer();

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadResponseResolver.java Tue Dec
21 21:45:56 2010
@@ -45,10 +45,12 @@ public class ReadResponseResolver implem
 	private static Logger logger_ = LoggerFactory.getLogger(ReadResponseResolver.class);
     private final String table;
     private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message,
ReadResponse>();
+    private DecoratedKey key;
 
-    public ReadResponseResolver(String table)
+    public ReadResponseResolver(String table, ByteBuffer key)
     {
         this.table = table;
+        this.key = StorageService.getPartitioner().decorateKey(key);
     }
     
     /*
@@ -66,9 +68,8 @@ public class ReadResponseResolver implem
         long startTime = System.currentTimeMillis();
 		List<ColumnFamily> versions = new ArrayList<ColumnFamily>();
 		List<InetAddress> endpoints = new ArrayList<InetAddress>();
-		DecoratedKey key = null;
 		ByteBuffer digest = null;
-        
+
         /*
 		 * Populate the list of rows from each of the messages
 		 * Check to see if there is a digest query. If a digest 
@@ -96,7 +97,6 @@ public class ReadResponseResolver implem
             {
                 versions.add(result.row().cf);
                 endpoints.add(message.getFrom());
-                key = result.row().key;
             }
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Tue Dec 21 21:45:56
2010
@@ -240,7 +240,7 @@ public class StorageProxy implements Sto
 
         // send off all the commands asynchronously
         List<Future<Object>> localFutures = null;
-        List<IAsyncResult> remoteResults = null;
+        HashMap<ReadCommand, IAsyncResult> remoteResults = null;
         for (ReadCommand command: commands)
         {
             InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table,
command.key);
@@ -257,13 +257,11 @@ public class StorageProxy implements Sto
             else
             {
                 if (remoteResults == null)
-                    remoteResults = new ArrayList<IAsyncResult>();
+                    remoteResults = new HashMap<ReadCommand, IAsyncResult>();
                 Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
                     logger.debug("weakread reading " + command + " from " + message.getMessageId()
+ "@" + endPoint);
-                if (randomlyReadRepair(command))
-                    message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
-                remoteResults.add(MessagingService.instance.sendRR(message, endPoint));
+                remoteResults.put(command, MessagingService.instance.sendRR(message, endPoint));
             }
         }
 
@@ -286,14 +284,18 @@ public class StorageProxy implements Sto
         }
         if (remoteResults != null)
         {
-            for (IAsyncResult iar: remoteResults)
+            for (Map.Entry<ReadCommand, IAsyncResult> entry : remoteResults.entrySet())
             {
+                ReadCommand command = entry.getKey();
+                IAsyncResult iar = entry.getValue();
                 byte[] body;
                 body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
                 ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
                 ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (response.row() != null)
-                    rows.add(response.row());
+                assert response.row() != null;
+                rows.add(response.row());
+                if (randomlyReadRepair(command))
+                    StorageService.instance.doConsistencyCheck(response.row(), command, iar.getFrom());
             }
         }
 
@@ -331,7 +333,8 @@ public class StorageProxy implements Sto
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
 
             AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
-            QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(new ReadResponseResolver(command.table),
consistency_level);
+            ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+            QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver,
consistency_level);
             handler.assureSufficientLiveNodes(endpoints);
 
             Message messages[] = new Message[endpoints.size()];
@@ -370,7 +373,8 @@ public class StorageProxy implements Sto
             catch (DigestMismatchException ex)
             {
                 AbstractReplicationStrategy rs = Table.open(command.table).getReplicationStrategy();
-                QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(new
ReadResponseResolver(command.table), consistency_level);
+                ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
+                QuorumResponseHandler<Row> handler = rs.getQuorumResponseHandler(resolver,
consistency_level);
                 if (logger.isDebugEnabled())
                     logger.debug("Digest mismatch:", ex);
                 Message messageRepair = command.makeReadMessage();
@@ -735,11 +739,7 @@ public class StorageProxy implements Sto
 
             // Do the consistency checks in the background
             if (randomlyReadRepair(command))
-            {
-                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
-                if (endpoints.size() > 1)
-                    StorageService.instance.doConsistencyCheck(row, endpoints, command);
-            }
+                StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress());
 
             return row;
         }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Dec 21 21:45:56
2010
@@ -485,15 +485,17 @@ public class StorageService implements I
     {
         return tokenMetadata_;
     }
-    
+
     /**
      * This method performs the requisite operations to make
      * sure that the N replicas are in sync. We do this in the
      * background when we do not care much about consistency.
      */
-    public void doConsistencyCheck(Row row, List<InetAddress> endpoints, ReadCommand
command)
+    public void doConsistencyCheck(Row row, ReadCommand command, InetAddress dataSource)
     {
-        consistencyManager_.submit(new ConsistencyChecker(command.table, row, endpoints,
command));
+        List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
+        if (endpoints.size() > 1)
+            consistencyManager_.submit(new ConsistencyChecker(command, row, endpoints, dataSource));
     }
 
     /**

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1051667&r1=1051666&r2=1051667&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Tue Dec
21 21:45:56 2010
@@ -41,6 +41,7 @@ import org.apache.cassandra.locator.Simp
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -95,7 +96,7 @@ public class ConsistencyLevelTest extend
 
                     IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts,
hintedNodes, c);
 
-                    QuorumResponseHandler<Row> readHandler = strategy.getQuorumResponseHandler(new
ReadResponseResolver(table), c);
+                    QuorumResponseHandler<Row> readHandler = strategy.getQuorumResponseHandler(new
ReadResponseResolver(table, ByteBufferUtil.bytes("foo")), c);
 
                     boolean isWriteUnavailable = false;
                     boolean isReadUnavailable = false;



Mime
View raw message