cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r918186 - in /incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra: db/RowMutationVerbHandler.java net/Header.java net/Message.java service/StorageProxy.java streaming/StreamOut.java
Date Tue, 02 Mar 2010 21:22:21 GMT
Author: jbellis
Date: Tue Mar  2 21:22:21 2010
New Revision: 918186

URL: http://svn.apache.org/viewvc?rev=918186&view=rev
Log:
clean up hints/headers, add ability to hint multiple targets per message.  patch by jbellis;
reviewed by Ryan King for CASSANDRA-822

Modified:
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
Tue Mar  2 21:22:21 2010
@@ -21,12 +21,15 @@
 import java.io.*;
 
 import java.net.InetAddress;
+import java.nio.ByteBuffer;
+
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class RowMutationVerbHandler implements IVerbHandler
 {
@@ -45,15 +48,21 @@
 
             /* Check if there were any hints in this message */
             byte[] hintedBytes = message.getHeader(RowMutation.HINT);
-            if ( hintedBytes != null && hintedBytes.length > 0 )
+            if (hintedBytes != null)
             {
-            	InetAddress hint = InetAddress.getByAddress(hintedBytes);
-                if (logger_.isDebugEnabled())
-                  logger_.debug("Adding hint for " + hint);
-                /* add necessary hints to this mutation */
-                RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable());
-                hintedMutation.addHints(rm.key(), hintedBytes);
-                hintedMutation.apply();
+                assert hintedBytes.length > 0;
+                ByteBuffer bb = ByteBuffer.wrap(hintedBytes);
+                byte[] addressBytes = new byte[FBUtilities.getLocalAddress().getAddress().length];
+                while (bb.remaining() > 0)
+                {
+                    bb.get(addressBytes);
+                    InetAddress hint = InetAddress.getByAddress(addressBytes);
+                    if (logger_.isDebugEnabled())
+                        logger_.debug("Adding hint for " + hint);
+                    RowMutation hintedMutation = new RowMutation(Table.SYSTEM_TABLE, rm.getTable());
+                    hintedMutation.addHints(rm.key(), addressBytes);
+                    hintedMutation.apply();
+                }
             }
 
             Table.open(rm.getTable()).apply(rm, bytes, true);

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Header.java
Tue Mar  2 21:22:21 2010
@@ -99,35 +99,15 @@
         messageId_ = id;
     }
     
-    void setMessageType(String type)
-    {
-        type_ = type;
-    }
-    
-    void setMessageVerb(StorageService.Verb verb)
-    {
-        verb_ = verb;
-    }
-    
     byte[] getDetail(Object key)
     {
         return details_.get(key);
     }
-    
-    void removeDetail(Object key)
-    {
-        details_.remove(key);
-    }
-    
-    void addDetail(String key, byte[] value)
+
+    void setDetail(String key, byte[] value)
     {
         details_.put(key, value);
     }
-    
-    Map<String, byte[]> getDetails()
-    {
-        return details_;
-    }
 }
 
 class HeaderSerializer implements ICompactSerializer<Header>

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/Message.java
Tue Mar  2 21:22:21 2010
@@ -21,7 +21,6 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Map;
 import java.net.InetAddress;
 
 import org.apache.cassandra.concurrent.StageManager;
@@ -64,14 +63,9 @@
         return header_.getDetail(key);
     }
     
-    public void addHeader(String key, byte[] value)
+    public void setHeader(String key, byte[] value)
     {
-        header_.addDetail(key, value);
-    }
-    
-    public Map<String, byte[]> getHeaders()
-    {
-        return header_.getDetails();
+        header_.setDetail(key, value);
     }
 
     public byte[] getMessageBody()

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Tue Mar  2 21:22:21 2010
@@ -27,11 +27,10 @@
 import java.util.concurrent.Future;
 import java.lang.management.ManagementFactory;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import java.net.InetAddress;
@@ -145,7 +144,7 @@
                         else
                         {
                             Message hintedMessage = rm.makeRowMutationMessage();
-                            hintedMessage.addHeader(RowMutation.HINT, target.getAddress());
+                            addHintHeader(hintedMessage, target);
                             if (logger.isDebugEnabled())
                                 logger.debug("insert writing key " + rm.key() + " to " +
hintedMessage.getMessageId() + "@" + hintedTarget + " for " + target);
                             MessagingService.instance.sendOneWay(hintedMessage, hintedTarget);
@@ -163,7 +162,14 @@
             writeStats.addNano(System.nanoTime() - startTime);
         }
     }
-    
+
+    private static void addHintHeader(Message message, InetAddress target)
+    {
+        byte[] oldHint = message.getHeader(RowMutation.HINT);
+        byte[] hint = oldHint == null ? target.getAddress() : ArrayUtils.addAll(oldHint,
target.getAddress());
+        message.setHeader(RowMutation.HINT, hint);
+    }
+
     public static void mutateBlocking(List<RowMutation> mutations, ConsistencyLevel
consistency_level) throws UnavailableException, TimeoutException
     {
         long startTime = System.nanoTime();
@@ -214,7 +220,7 @@
                     else
                     {
                         Message hintedMessage = rm.makeRowMutationMessage();
-                        hintedMessage.addHeader(RowMutation.HINT, naturalTarget.getAddress());
+                        addHintHeader(hintedMessage, naturalTarget);
                         // (hints are part of the callback and count towards consistency
only under CL.ANY
                         if (consistency_level == ConsistencyLevel.ANY)
                             MessagingService.instance.addCallback(responseHandler, hintedMessage.getMessageId());
@@ -343,7 +349,7 @@
 
             if (logger.isDebugEnabled())
                 logger.debug("weakreadremote reading " + command + " from " + message.getMessageId()
+ "@" + endPoint);
-            message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+            message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
             iars.add(MessagingService.instance.sendRR(message, endPoint));
         }
 

Modified: incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=918186&r1=918185&r2=918186&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++ incubator/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Tue Mar  2 21:22:21 2010
@@ -130,7 +130,7 @@
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
         StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
-        message.addHeader(StreamOut.TABLE_NAME, table.getBytes());
+        message.setHeader(StreamOut.TABLE_NAME, table.getBytes());
         if (logger.isDebugEnabled())
           logger.debug("Sending a stream initiate message to " + target + " ...");
         MessagingService.instance.sendOneWay(message, target);



Mime
View raw message