cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1051640 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
Date Tue, 21 Dec 2010 20:41:52 GMT
Author: jbellis
Date: Tue Dec 21 20:41:47 2010
New Revision: 1051640

URL: http://svn.apache.org/viewvc?rev=1051640&view=rev
Log:
manage read repair in coordinator instead of data source, to provide latency information to
dynamic snitch
patch by jbellis; reviewed by brandonwilliams for CASSANDRA-1873

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Dec 21 20:41:47 2010
@@ -26,6 +26,8 @@
  * return InvalidRequest when remove of subcolumn without supercolumn
    is requested (CASSANDRA-1866)
  * Re-cache hot keys post-compaction without hitting disk (CASSANDRA-1878)
+ * manage read repair in coordinator instead of data source, to
+   provide latency information to dynamic snitch (CASSANDRA-1873)
 
 
 0.6.8

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadCommand.java Tue
Dec 21 20:41:47 2010
@@ -36,7 +36,6 @@ import org.apache.cassandra.concurrent.S
 
 public abstract class ReadCommand
 {
-    public static final String DO_REPAIR = "READ-REPAIR";
     public static final byte CMD_TYPE_GET_SLICE_BY_NAMES = 1;
     public static final byte CMD_TYPE_GET_SLICE = 2;
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadResponse.java Tue
Dec 21 20:41:47 2010
@@ -63,6 +63,7 @@ private static ICompactSerializer<ReadRe
 
 	public ReadResponse(Row row)
     {
+        assert row != null;
 		row_ = row;
 	}
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Tue Dec 21 20:41:47 2010
@@ -92,14 +92,6 @@ public class ReadVerbHandler implements 
             if (logger_.isDebugEnabled())
               logger_.debug("Read key " + command.key + "; sending response to " + message.getMessageId()
+ "@" + message.getFrom());
             MessagingService.instance.sendOneWay(response, message.getFrom());
-
-            /* Do read repair if header of the message says so */
-            if (message.getHeader(ReadCommand.DO_REPAIR) != null)
-            {
-                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
-                if (endpoints.size() > 1)
-                    StorageService.instance.doConsistencyCheck(row, endpoints, command);
-            }
         }
         catch (IOException ex)
         {

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/AsyncResult.java Tue
Dec 21 20:41:47 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.net;
 
+import java.net.InetAddress;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,6 +36,7 @@ class AsyncResult implements IAsyncResul
     private Lock lock_ = new ReentrantLock();
     private Condition condition_;
     private long startTime_;
+    private InetAddress from;
 
     public AsyncResult()
     {        
@@ -101,14 +103,15 @@ class AsyncResult implements IAsyncResul
         }
         return result_;
     }
-      
+
     public void result(Message response)
     {        
         try
         {
             lock_.lock();
             if ( !done_.get() )
-            {                
+            {
+                from = response.getFrom();
                 result_ = response.getMessageBody();
                 done_.set(true);
                 condition_.signal();
@@ -118,5 +121,10 @@ class AsyncResult implements IAsyncResul
         {
             lock_.unlock();
         }        
-    }    
+    }
+
+    public InetAddress getFrom()
+    {
+        return from;
+    }
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/IAsyncResult.java Tue
Dec 21 20:41:47 2010
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.net;
 
+import java.net.InetAddress;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -51,4 +52,6 @@ public interface IAsyncResult
      * @param result the response message
      */
     public void result(Message result);
+
+    public InetAddress getFrom();
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Tue Dec 21 20:41:47 2010
@@ -38,7 +38,6 @@ import org.apache.cassandra.db.ColumnFam
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Row;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -52,7 +51,7 @@ import org.apache.cassandra.utils.FBUtil
  * (1) sends DIGEST read requests to each other replica of the given row.
  *
  * [DigestResponseHandler]
- * (2) If any of the digests to not match the local one, it sends a second round of requests
+ * (2) If any of the digests to not match the data read, it sends a second round of requests
  * to each replica, this time for the full data
  *
  * [DataRepairHandler]
@@ -64,18 +63,17 @@ class ConsistencyChecker implements Runn
 	private static Logger logger_ = Logger.getLogger(ConsistencyChecker.class);
     private static ExpiringMap<String, String> readRepairTable_ = new ExpiringMap<String,
String>(DatabaseDescriptor.getRpcTimeout());
 
-    private final String table_;
     private final Row row_;
     protected final List<InetAddress> replicas_;
     private final ReadCommand readCommand_;
+    private final InetAddress dataSource;
 
-    public ConsistencyChecker(String table, Row row, List<InetAddress> endpoints, ReadCommand
readCommand)
+    public ConsistencyChecker(ReadCommand command, Row row, List<InetAddress> endpoints,
InetAddress dataSource)
     {
-        table_ = table;
         row_ = row;
         replicas_ = endpoints;
-        readCommand_ = readCommand;
-        assert replicas_.contains(FBUtilities.getLocalAddress());
+        readCommand_ = command;
+        this.dataSource = dataSource;
     }
 
     public void run()
@@ -90,7 +88,7 @@ class ConsistencyChecker implements Runn
             MessagingService.instance.addCallback(new DigestResponseHandler(), message.getMessageId());
             for (InetAddress endpoint : replicas_)
             {
-                if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                if (!endpoint.equals(dataSource))
                     MessagingService.instance.sendOneWay(message, endpoint);
             }
 		}
@@ -110,7 +108,7 @@ class ConsistencyChecker implements Runn
     class DigestResponseHandler implements IAsyncCallback
 	{
         private boolean repairInvoked;
-        private final byte[] localDigest = ColumnFamily.digest(row_.cf);
+        private final byte[] dataDigest = ColumnFamily.digest(row_.cf);
 
         public synchronized void response(Message response)
 		{
@@ -124,19 +122,16 @@ class ConsistencyChecker implements Runn
                 ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
                 byte[] digest = result.digest();
 
-                if (!Arrays.equals(localDigest, digest))
+                if (!Arrays.equals(dataDigest, digest))
                 {
-                    ReadResponseResolver readResponseResolver = new ReadResponseResolver(table_,
replicas_.size());
-                    IAsyncCallback responseHandler = new DataRepairHandler(row_, replicas_.size(),
readResponseResolver);
-
                     ReadCommand readCommand = constructReadMessage(false);
                     Message message = readCommand.makeReadMessage();
                     if (logger_.isDebugEnabled())
                         logger_.debug("Digest mismatch; re-reading " + readCommand_.key +
" from " + message.getMessageId() + "@[" + StringUtils.join(replicas_, ", ") + "]");     
                   
-                    MessagingService.instance.addCallback(responseHandler, message.getMessageId());
+                    MessagingService.instance.addCallback(new DataRepairHandler(), message.getMessageId());
                     for (InetAddress endpoint : replicas_)
                     {
-                        if (!endpoint.equals(FBUtilities.getLocalAddress()))
+                        if (!endpoint.equals(dataSource))
                             MessagingService.instance.sendOneWay(message, endpoint);
                     }
 
@@ -150,19 +145,19 @@ class ConsistencyChecker implements Runn
         }
     }
 
-	static class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
+    class DataRepairHandler implements IAsyncCallback, ICacheExpungeHook<String, String>
 	{
 		private final Collection<Message> responses_ = new LinkedBlockingQueue<Message>();
 		private final ReadResponseResolver readResponseResolver_;
 		private final int majority_;
 		
-        public DataRepairHandler(Row localRow, int responseCount, ReadResponseResolver readResponseResolver)
throws IOException
+        public DataRepairHandler() throws IOException
         {
-            readResponseResolver_ = readResponseResolver;
-            majority_ = (responseCount / 2) + 1;
-            // wrap localRow in a response Message so it doesn't need to be special-cased
in the resolver
-            ReadResponse readResponse = new ReadResponse(localRow);
-            Message fakeMessage = new Message(FBUtilities.getLocalAddress(), StageManager.RESPONSE_STAGE,
StorageService.Verb.READ_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);
+            readResponseResolver_ = new ReadResponseResolver(readCommand_.table, replicas_.size());
+            majority_ = (replicas_.size() / 2) + 1;
+            // wrap original data Row in a response Message so it doesn't need to be special-cased
in the resolver
+            ReadResponse readResponse = new ReadResponse(row_);
+            Message fakeMessage = new Message(dataSource, StageManager.RESPONSE_STAGE, StorageService.Verb.READ_RESPONSE,
ArrayUtils.EMPTY_BYTE_ARRAY);
             responses_.add(fakeMessage);
             readResponseResolver_.injectPreProcessed(fakeMessage, readResponse);
         }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Dec 21 20:41:47 2010
@@ -363,7 +363,7 @@ public class StorageProxy implements Sto
 
         // send off all the commands asynchronously
         List<Future<Object>> localFutures = null;
-        List<IAsyncResult> remoteResults = null;
+        HashMap<ReadCommand, IAsyncResult> remoteResults = null;
         for (ReadCommand command: commands)
         {
             InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.table,
command.key);
@@ -380,13 +380,11 @@ public class StorageProxy implements Sto
             else
             {
                 if (remoteResults == null)
-                    remoteResults = new ArrayList<IAsyncResult>();
+                    remoteResults = new HashMap<ReadCommand, IAsyncResult>();
                 Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
                     logger.debug("weakread reading " + command + " from " + message.getMessageId()
+ "@" + endPoint);
-                if (DatabaseDescriptor.getConsistencyCheck())
-                    message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
-                remoteResults.add(MessagingService.instance.sendRR(message, endPoint));
+                remoteResults.put(command, MessagingService.instance.sendRR(message, endPoint));
             }
         }
 
@@ -409,14 +407,16 @@ public class StorageProxy implements Sto
         }
         if (remoteResults != null)
         {
-            for (IAsyncResult iar: remoteResults)
+            for (Map.Entry<ReadCommand, IAsyncResult> entry : remoteResults.entrySet())
             {
+                IAsyncResult iar = entry.getValue();
                 byte[] body;
                 body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
                 ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
                 ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (response.row() != null)
-                    rows.add(response.row());
+                assert response.row() != null;
+                rows.add(response.row());
+                StorageService.instance.doConsistencyCheck(response.row(), entry.getKey(),
iar.getFrom());
             }
         }
 
@@ -711,14 +711,7 @@ public class StorageProxy implements Sto
 
             Table table = Table.open(command.table);
             Row row = command.getRow(table);
-
-            // Do the consistency checks in the background
-            if (DatabaseDescriptor.getConsistencyCheck())
-            {
-                List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
-                if (endpoints.size() > 1)
-                    StorageService.instance.doConsistencyCheck(row, endpoints, command);
-            }
+            StorageService.instance.doConsistencyCheck(row, command, FBUtilities.getLocalAddress());
 
             return row;
         }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=1051640&r1=1051639&r2=1051640&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Tue Dec 21 20:41:47 2010
@@ -418,15 +418,20 @@ public class StorageService implements I
     {
         return tokenMetadata_;
     }
-    
+
     /**
      * This method performs the requisite operations to make
      * sure that the N replicas are in sync. We do this in the
      * background when we do not care much about consistency.
      */
-    public void doConsistencyCheck(Row row, List<InetAddress> endpoints, ReadCommand
command)
+    public void doConsistencyCheck(Row row, ReadCommand command, InetAddress dataSource)
     {
-        consistencyManager_.submit(new ConsistencyChecker(command.table, row, endpoints,
command));
+        if (DatabaseDescriptor.getConsistencyCheck())
+        {
+            List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
+            if (endpoints.size() > 1)
+                consistencyManager_.submit(new ConsistencyChecker(command, row, endpoints,
dataSource));
+        }
     }
 
     /**



Mime
View raw message