cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1024327 - in /cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service: IResponseResolver.java QuorumResponseHandler.java RangeSliceResponseResolver.java ReadResponseResolver.java
Date Tue, 19 Oct 2010 17:20:46 GMT
Author: jbellis
Date: Tue Oct 19 17:20:45 2010
New Revision: 1024327

URL: http://svn.apache.org/viewvc?rev=1024327&view=rev
Log:
avoid deserializing QUORUM responses twice.  patch by jbellis; tested by Wayne for CASSANDRA-1622

Modified:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/IResponseResolver.java
Tue Oct 19 17:20:45 2010
@@ -37,4 +37,5 @@ public interface IResponseResolver<T> {
 	public T resolve(Collection<Message> responses) throws DigestMismatchException, IOException;
 	public boolean isDataPresent(Collection<Message> responses);
 
+    public void preprocess(Message message);
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
Tue Oct 19 17:20:45 2010
@@ -89,6 +89,7 @@ public class QuorumResponseHandler<T> im
     public void response(Message message)
     {
         responses.add(message);
+        responseResolver.preprocess(message);
         if (responseResolver.isDataPresent(responses))
         {
             condition.signal();

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
Tue Oct 19 17:20:45 2010
@@ -108,6 +108,10 @@ public class RangeSliceResponseResolver 
         return resolvedRows;
     }
 
+    public void preprocess(Message message)
+    {
+    }
+
     public boolean isDataPresent(Collection<Message> responses)
     {
         return responses.size() >= sources.size();

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1024327&r1=1024326&r2=1024327&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Tue Oct 19 17:20:45 2010
@@ -22,10 +22,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
+import java.util.*;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadResponse;
@@ -37,6 +34,7 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.apache.log4j.Logger;
 
@@ -49,6 +47,7 @@ public class ReadResponseResolver implem
 	private static Logger logger_ = Logger.getLogger(ReadResponseResolver.class);
     private final String table;
     private final int responseCount;
+    private final Map<Message, ReadResponse> results = new NonBlockingHashMap<Message,
ReadResponse>();
 
     public ReadResponseResolver(String table, int responseCount)
     {
@@ -84,11 +83,11 @@ public class ReadResponseResolver implem
          * query exists then we need to compare the digest with 
          * the digest of the data that is received.
         */
-		for (Message response : responses)
-		{					            
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+		for (Message message : responses)
+		{
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived after quorum already achieved
             if (result.isDigestQuery())
             {
                 digest = result.digest();
@@ -97,14 +96,11 @@ public class ReadResponseResolver implem
             else
             {
                 versions.add(result.row().cf);
-                endPoints.add(response.getFrom());
+                endPoints.add(message.getFrom());
                 key = result.row().key;
             }
         }
 
-        if (logger_.isDebugEnabled())
-            logger_.debug("responses deserialized");
-
 		// If there was a digest query compare it with all the data digests
 		// If there is a mismatch then throw an exception so that read repair can happen.
         if (isDigestQuery)
@@ -190,30 +186,36 @@ public class ReadResponseResolver implem
         return resolved;
     }
 
-	public boolean isDataPresent(Collection<Message> responses)
-	{
-        if (responses.size() < responseCount)
-            return false;
-
-        boolean isDataPresent = false;
-        for (Message response : responses)
+    public void preprocess(Message message)
+    {
+        byte[] body = message.getMessageBody();
+        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+        try
         {
-            byte[] body = response.getMessageBody();
-            ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
-            try
-            {
-                ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
-                if (!result.isDigestQuery())
-                {
-                    isDataPresent = true;
-                }
-                bufIn.close();
-            }
-            catch (IOException ex)
-            {
-                throw new RuntimeException(ex);
-            }
+            ReadResponse result = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+            results.put(message, result);
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
         }
-        return isDataPresent;
     }
+
+    public boolean isDataPresent(Collection<Message> responses)
+	{
+        int digests = 0;
+        int data = 0;
+        for (Message message : responses)
+        {
+            ReadResponse result = results.get(message);
+            if (result == null)
+                continue; // arrived concurrently
+            if (result.isDigestQuery())
+                digests++;
+            else
+                data++;
+        }
+        return data > 0 && (data + digests >= responseCount);
+    }
+
 }



Mime
View raw message