cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1062888 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
Date Mon, 24 Jan 2011 17:25:50 GMT
Author: jbellis
Date: Mon Jan 24 17:25:50 2011
New Revision: 1062888

URL: http://svn.apache.org/viewvc?rev=1062888&view=rev
Log:
add optimization for local reads to StorageProxy
patch by jbellis and tjake for CASSANDRA-2038

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Mon Jan 24 17:25:50 2011
@@ -11,7 +11,7 @@
  * retry hadoop split requests on connection failure (CASSANDRA-1927)
  * implement describeOwnership for BOP, COPP (CASSANDRA-1928)
  * make read repair behave as expected for ConsistencyLevel > ONE
-   (CASSANDRA-982)
+   (CASSANDRA-982, 2038)
  * distributed test harness (CASSANDRA-1859, 1964)
  * reduce flush lock contention (CASSANDRA-1930)
  * optimize supercolumn deserialization (CASSANDRA-1891)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
Mon Jan 24 17:25:50 2011
@@ -67,17 +67,7 @@ public class ReadVerbHandler implements 
             ReadCommand command = ReadCommand.serializer().deserialize(new DataInputStream(readCtx.bufIn_));
             Table table = Table.open(command.table);
             Row row = command.getRow(table);
-            ReadResponse readResponse;
-            if (command.isDigestQuery())
-            {
-                if (logger_.isDebugEnabled())
-                    logger_.debug("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
-                readResponse = new ReadResponse(ColumnFamily.digest(row.cf));
-            }
-            else
-            {
-                readResponse = new ReadResponse(row);
-            }
+            ReadResponse readResponse = getResponse(command, row);
             /* serialize the ReadResponseMessage. */
             readCtx.bufOut_.reset();
 
@@ -97,4 +87,18 @@ public class ReadVerbHandler implements 
             throw new RuntimeException(ex);
         }
     }
+
+    public static ReadResponse getResponse(ReadCommand command, Row row)
+    {
+        if (command.isDigestQuery())
+        {
+            if (logger_.isDebugEnabled())
+                logger_.debug("digest is " + ByteBufferUtil.bytesToHex(ColumnFamily.digest(row.cf)));
+            return new ReadResponse(ColumnFamily.digest(row.cf));
+        }
+        else
+        {
+            return new ReadResponse(row);
+        }
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java Mon Jan
24 17:25:50 2011
@@ -45,12 +45,12 @@ public class Header
         return serializer_;
     }
 
-    private InetAddress from_;
+    private final InetAddress from_;
     // TODO STAGE can be determined from verb
-    private StorageService.Verb verb_;
-    private String messageId_;
+    private final StorageService.Verb verb_;
+    private final String messageId_;
     protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
-    
+
     Header(String id, InetAddress from, StorageService.Verb verb)
     {
         assert id != null;
@@ -88,12 +88,7 @@ public class Header
         return messageId_;
     }
 
-    void setMessageId(String id)
-    {
-        messageId_ = id;
-    }
-    
-    byte[] getDetail(Object key)
+    byte[] getDetail(String key)
     {
         return details_.get(key);
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java Mon Jan
24 17:25:50 2011
@@ -59,7 +59,7 @@ public class Message
         this(new Header(from, verb), body);
     }    
     
-    public byte[] getHeader(Object key)
+    public byte[] getHeader(String key)
     {
         return header_.getDetail(key);
     }
@@ -94,11 +94,6 @@ public class Message
         return header_.getMessageId();
     }
 
-    void setMessageId(String id)
-    {
-        header_.setMessageId(id);
-    }    
-
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the
right len
     public Message getReply(InetAddress from, byte[] args)
     {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
Mon Jan 24 17:25:50 2011
@@ -312,34 +312,6 @@ public final class MessagingService impl
     }
 
     /**
-     * Send a message to a given endpoint. The ith element in the <code>messages</code>
-     * array is sent to the ith element in the <code>to</code> array.This method
assumes
-     * there is a one-one mapping between the <code>messages</code> array and
-     * the <code>to</code> array. Otherwise an  IllegalArgumentException will
be thrown.
-     * This method also informs the MessagingService to wait for at least
-     * <code>howManyResults</code> responses to determine success of failure.
-     * @param messages messages to be sent.
-     * @param to endpoints to which the message needs to be sent
-     * @param cb callback interface which is used to pass the responses or
-     *           suggest that a timeout occured to the invoker of the send().
-     * @return an reference to message id used to match with the result
-     */
-    public String sendRR(Message[] messages, List<InetAddress> to, IAsyncCallback cb)
-    {
-        if (messages.length != to.size())
-            throw new IllegalArgumentException("Number of messages and the number of endpoints
need to be same.");
-        String groupId = GuidGenerator.guid();
-        addCallback(cb, groupId);
-        for ( int i = 0; i < messages.length; ++i )
-        {
-            messages[i].setMessageId(groupId);
-            putTarget(groupId, to.get(i));
-            sendOneWay(messages[i], to.get(i));
-        }
-        return groupId;
-    } 
-    
-    /**
      * Send a message to a given endpoint. This method adheres to the fire and forget
      * style messaging.
      * @param message messages to be sent.

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadCallback.java
Mon Jan 24 17:25:50 2011
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
@@ -89,6 +90,15 @@ public class ReadCallback<T> implements 
         if (resolver.isDataPresent())
             condition.signal();
     }
+
+    public void response(ReadResponse result)
+    {
+        ((ReadResponseResolver) resolver).injectPreProcessed(result);
+        if (resolver.getMessageCount() < blockfor)
+            return;
+        if (resolver.isDataPresent())
+            condition.signal();
+    }
     
     public int determineBlockFor(ConsistencyLevel consistencyLevel, String table)
     {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ReadResponseResolver.java
Mon Jan 24 17:25:50 2011
@@ -29,12 +29,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 /**
@@ -48,6 +50,7 @@ public class ReadResponseResolver implem
     private final ConcurrentMap<Message, ReadResponse> results = new NonBlockingHashMap<Message,
ReadResponse>();
     private DecoratedKey key;
     private ByteBuffer digest;
+    private static final Message FAKE_MESSAGE = new Message(FBUtilities.getLocalAddress(),
StorageService.Verb.INTERNAL_RESPONSE, ArrayUtils.EMPTY_BYTE_ARRAY);;
 
     public ReadResponseResolver(String table, ByteBuffer key)
     {
@@ -225,10 +228,11 @@ public class ReadResponseResolver implem
         }
     }
 
-    /** hack so ConsistencyChecker doesn't have to serialize/deserialize an extra real Message
*/
-    public void injectPreProcessed(Message message, ReadResponse result)
+    /** hack so local reads don't force de/serialization of an extra real Message */
+    public void injectPreProcessed(ReadResponse result)
     {
-        results.put(message, result);
+        assert results.get(FAKE_MESSAGE) == null; // should only be one local reply
+        results.put(FAKE_MESSAGE, result);
     }
 
     public boolean isDataPresent()

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1062888&r1=1062887&r2=1062888&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
Mon Jan 24 17:25:50 2011
@@ -343,14 +343,9 @@ public class StorageProxy implements Sto
         for (ReadCommand command: commands)
         {
             assert !command.isDigestQuery();
-            ReadCommand readMessageDigestOnly = command.copy();
-            readMessageDigestOnly.setDigestQuery(true);
-            Message message = command.makeReadMessage();
-            Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
 
             List<InetAddress> endpoints = StorageService.instance.getLiveNaturalEndpoints(command.table,
command.key);
             DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(),
endpoints);
-            InetAddress dataPoint = endpoints.get(0);
 
             ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
             ReadCallback<Row> handler = getReadCallback(resolver, command.table, consistency_level);
@@ -366,19 +361,52 @@ public class StorageProxy implements Sto
             {
                 endpoints = endpoints.subList(0, handler.blockfor);
             }
-            Message[] messages = new Message[endpoints.size()];
-
-            // data-request message is sent to dataPoint, the node that will actually get
+            
+            // The data-request message is sent to dataPoint, the node that will actually
get
             // the data for us. The other replicas are only sent a digest query.
-            for (int i = 0; i < messages.length; i++)
+            ReadCommand digestCommand = null;
+            if (endpoints.size() > 1)
+            {
+                digestCommand = command.copy();
+                digestCommand.setDigestQuery(true);
+            }
+
+            InetAddress dataPoint = endpoints.get(0);
+            if (dataPoint.equals(FBUtilities.getLocalAddress()))
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("reading data for " + command + " locally");
+                StageManager.getStage(Stage.READ).submit(new WeakReadLocalRunnable(command,
handler));
+            }
+            else
             {
-                InetAddress endpoint = endpoints.get(i);
-                Message m = endpoint.equals(dataPoint) ? message : messageDigestOnly;
-                messages[i] = m;
+                Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
-                    logger.debug("reading " + (m == message ? "data" : "digest") + " for
" + command + " from " + m.getMessageId() + "@" + endpoint);
+                    logger.debug("reading digest for " + command + " from " + message.getMessageId()
+ "@" + dataPoint);
+                MessagingService.instance().sendRR(message, dataPoint, handler);
             }
-            MessagingService.instance().sendRR(messages, endpoints, handler);
+
+            // We lazy-construct the digest Message object since it may not be necessary
if we
+            // are doing a local digest read, or no digest reads at all.
+            Message digestMessage = null;
+            for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
+            {
+                if (digestPoint.equals(FBUtilities.getLocalAddress()))
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug("reading digest for " + command + " locally");
+                    StageManager.getStage(Stage.READ).submit(new WeakReadLocalRunnable(digestCommand,
handler));
+                }
+                else
+                {
+                    if (digestMessage == null)
+                        digestMessage = digestCommand.makeReadMessage();
+                    if (logger.isDebugEnabled())
+                        logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId()
+ "@" + digestPoint);
+                    MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
+                }
+            }
+
             readCallbacks.add(handler);
             commandEndpoints.add(endpoints);
         }
@@ -436,6 +464,28 @@ public class StorageProxy implements Sto
         return rows;
     }
 
+    static class WeakReadLocalRunnable extends WrappedRunnable
+    {
+        private final ReadCommand command;
+        private final ReadCallback<Row> handler;
+
+        WeakReadLocalRunnable(ReadCommand command, ReadCallback<Row> handler)
+        {
+            this.command = command;
+            this.handler = handler;
+        }
+
+        protected void runMayThrow() throws IOException
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("weakreadlocal reading " + command);
+
+            Table table = Table.open(command.table);
+            ReadResponse result = ReadVerbHandler.getResponse(command, command.getRow(table));
+            handler.response(result);
+        }
+    }
+    
     static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver,
String table, ConsistencyLevel consistencyLevel)
     {
         if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))



Mime
View raw message