cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1068101 [1/3] - in /cassandra/branches/cassandra-0.7: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/jav...
Date Mon, 07 Feb 2011 20:28:12 GMT
Author: jbellis
Date: Mon Feb  7 20:28:10 2011
New Revision: 1068101

URL: http://svn.apache.org/viewvc?rev=1068101&view=rev
Log:
move id out of Message so we can do cross-DC forwarding again
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1530

Modified:
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RangeSliceCommand.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Row.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RowMutation.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceFromReadCommand.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Truncation.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace1.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace2.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace3.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace4.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.migration.Keyspace5.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/gms.EndpointState.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/service.TreeRequest.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/service.TreeResponse.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/streaming.StreamReply.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/streaming.StreamRequestMessage.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/utils.BloomFilter.bin
    cassandra/branches/cassandra-0.7/test/data/serialization/0.7/utils.LegacyBloomFilter.bin

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/BinaryVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -32,7 +32,7 @@ public class BinaryVerbHandler implement
 {
     private static Logger logger_ = LoggerFactory.getLogger(BinaryVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     { 
         byte[] bytes = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -45,8 +45,8 @@ public class BinaryVerbHandler implement
             WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
             if (logger_.isDebugEnabled())
-              logger_.debug("binary " + rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+              logger_.debug("binary " + rm + " applied.  Sending response to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
         }
         catch (Exception e)
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -28,7 +28,7 @@ public class DefinitionsAnnounceVerbHand
 {
     
     /** someone is announcing their schema version. */
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
         MigrationManager.rectify(theirVersion, message.getFrom());

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -42,7 +42,7 @@ public class DefinitionsUpdateResponseVe
     private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
 
     /** someone sent me their data definitions */
-    public void doVerb(final Message message)
+    public void doVerb(final Message message, String id)
     {
         try
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -33,7 +33,7 @@ public class ReadRepairVerbHandler imple
 {
     private static Logger logger_ = LoggerFactory.getLogger(ReadRepairVerbHandler.class);    
     
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {          
         byte[] body = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(body);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ReadVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -45,7 +45,7 @@ public class ReadVerbHandler implements 
     /* We use this so that we can reuse readcontext objects */
     private static ThreadLocal<ReadVerbHandler.ReadContext> tls_ = new InheritableThreadLocal<ReadVerbHandler.ReadContext>();
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] body = message.getMessageBody();
         /* Obtain a Read Context from TLS */
@@ -79,8 +79,8 @@ public class ReadVerbHandler implements 
             Message response = message.getReply(FBUtilities.getLocalAddress(), bytes);
             if (logger_.isDebugEnabled())
               logger_.debug(String.format("Read key %s; sending response to %s@%s",
-                                          ByteBufferUtil.bytesToHex(command.key), message.getMessageId(), message.getFrom()));
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+                                          ByteBufferUtil.bytesToHex(command.key), id, message.getFrom()));
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
         catch (IOException ex)
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -40,7 +40,7 @@ public class RowMutationVerbHandler impl
 {
     private static Logger logger_ = LoggerFactory.getLogger(RowMutationVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] bytes = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -78,8 +78,8 @@ public class RowMutationVerbHandler impl
             WriteResponse response = new WriteResponse(rm.getTable(), rm.key(), true);
             Message responseMessage = WriteResponse.makeWriteResponseMessage(message, response);
             if (logger_.isDebugEnabled())
-              logger_.debug(rm + " applied.  Sending response to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+              logger_.debug(rm + " applied.  Sending response to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
         }
         catch (IOException e)
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -31,10 +31,10 @@ public class SchemaCheckVerbHandler impl
 {
     private final Logger logger = LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
     
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         logger.debug("Received schema check request.");
         Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes());
-        MessagingService.instance().sendOneWay(response, message.getFrom());
+        MessagingService.instance().sendReply(response, id, message.getFrom());
     }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/TruncateVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -40,7 +40,7 @@ public class TruncateVerbHandler impleme
 {
     private static Logger logger = LoggerFactory.getLogger(TruncateVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] bytes = message.getMessageBody();
         ByteArrayInputStream buffer = new ByteArrayInputStream(bytes);
@@ -76,9 +76,8 @@ public class TruncateVerbHandler impleme
 
             TruncateResponse response = new TruncateResponse(t.keyspace, t.columnFamily, true);
             Message responseMessage = TruncateResponse.makeTruncateResponseMessage(message, response);
-            logger.debug("{} applied.  Sending response to {}@{} ",
-                    new Object[]{t, message.getMessageId(), message.getFrom()});
-            MessagingService.instance().sendOneWay(responseMessage, message.getFrom());
+            logger.debug("{} applied.  Sending response to {}@{} ", new Object[]{ t, id, message.getFrom()});
+            MessagingService.instance().sendReply(responseMessage, id, message.getFrom());
         }
         catch (IOException e)
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java Mon Feb  7 20:28:10 2011
@@ -250,12 +250,12 @@ public class BootStrapper
 
     public static class BootstrapTokenVerbHandler implements IVerbHandler
     {
-        public void doVerb(Message message)
+        public void doVerb(Message message, String id)
         {
             StorageService ss = StorageService.instance;
             String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
             Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8));
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
     }
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Mon Feb  7 20:28:10 2011
@@ -37,7 +37,7 @@ public class GossipDigestAck2VerbHandler
 {
     private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAck2VerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -40,7 +40,7 @@ public class GossipDigestAckVerbHandler 
 {
     private static Logger logger_ = LoggerFactory.getLogger(GossipDigestAckVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -39,7 +39,7 @@ public class GossipDigestSynVerbHandler 
 {
     private static Logger logger_ = LoggerFactory.getLogger( GossipDigestSynVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         InetAddress from = message.getFrom();
         if (logger_.isTraceEnabled())

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Header.java Mon Feb  7 20:28:10 2011
@@ -33,8 +33,7 @@ import org.apache.cassandra.service.Stor
 public class Header
 {
     private static ICompactSerializer<Header> serializer_;
-    private static AtomicInteger idGen_ = new AtomicInteger(0);
-    
+
     static
     {
         serializer_ = new HeaderSerializer();        
@@ -48,31 +47,23 @@ public class Header
     private final InetAddress from_;
     // TODO STAGE can be determined from verb
     private final StorageService.Verb verb_;
-    private final String messageId_;
     protected Map<String, byte[]> details_ = new Hashtable<String, byte[]>();
 
-    Header(String id, InetAddress from, StorageService.Verb verb)
+    Header(InetAddress from, StorageService.Verb verb)
     {
-        assert id != null;
         assert from != null;
         assert verb != null;
 
-        messageId_ = id;
         from_ = from;
         verb_ = verb;
     }
-    
-    Header(String id, InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
+
+    Header(InetAddress from, StorageService.Verb verb, Map<String, byte[]> details)
     {
-        this(id, from, verb);
+        this(from, verb);
         details_ = details;
     }
 
-    Header(InetAddress from, StorageService.Verb verb)
-    {
-        this(Integer.toString(idGen_.incrementAndGet()), from, verb);
-    }        
-
     InetAddress getFrom()
     {
         return from_;
@@ -83,11 +74,6 @@ public class Header
         return verb_;
     }
 
-    String getMessageId()
-    {
-        return messageId_;
-    }
-
     byte[] getDetail(String key)
     {
         return details_.get(key);
@@ -108,7 +94,6 @@ class HeaderSerializer implements ICompa
 {
     public void serialize(Header t, DataOutputStream dos) throws IOException
     {           
-        dos.writeUTF(t.getMessageId());
         CompactEndpointSerializationHelper.serialize(t.getFrom(), dos);
         dos.writeInt(t.getVerb().ordinal());
         
@@ -128,7 +113,6 @@ class HeaderSerializer implements ICompa
 
     public Header deserialize(DataInputStream dis) throws IOException
     {
-        String id = dis.readUTF();
         InetAddress from = CompactEndpointSerializationHelper.deserialize(dis);
         int verbOrdinal = dis.readInt();
         
@@ -144,7 +128,7 @@ class HeaderSerializer implements ICompa
             details.put(key, bytes);
         }
         
-        return new Header(id, from, StorageService.VERBS[verbOrdinal], details);
+        return new Header(from, StorageService.VERBS[verbOrdinal], details);
     }
 }
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -31,8 +31,9 @@ public interface IVerbHandler
      * class was registered by a call to MessagingService.registerVerbHandlers).
      * Note that the caller should not be holding any locks when calling this method
      * because the implementation may be synchronized.
-     * 
-     * @param message - incoming message that needs handling.     
+     *
+     * @param message - incoming message that needs handling.
+     * @param id
      */
-    public void doVerb(Message message);
+    public void doVerb(Message message, String id);
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Mon Feb  7 20:28:10 2011
@@ -101,8 +101,10 @@ public class IncomingTcpConnection exten
                         logger.info("Received connection from newer protocol version. Ignorning message.");
                     else
                     {
-                        Message message = Message.serializer().deserialize(new DataInputStream(new ByteArrayInputStream(contentBytes)));
-                        MessagingService.instance().receive(message);
+                        DataInputStream dis = new DataInputStream(new ByteArrayInputStream(contentBytes));
+                        String id = dis.readUTF();
+                        Message message = Message.serializer().deserialize(dis);
+                        MessagingService.instance().receive(message, id);
                     }
                 }
                 // prepare to read the next message

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java Mon Feb  7 20:28:10 2011
@@ -94,21 +94,17 @@ public class Message
         return header_.getVerb();
     }
 
-    public String getMessageId()
-    {
-        return header_.getMessageId();
-    }
-
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the right len
+    // TODO make static
     public Message getReply(InetAddress from, byte[] args)
     {
-        Header header = new Header(getMessageId(), from, StorageService.Verb.REQUEST_RESPONSE);
+        Header header = new Header(from, StorageService.Verb.REQUEST_RESPONSE);
         return new Message(header, args);
     }
 
     public Message getInternalReply(byte[] body)
     {
-        Header header = new Header(getMessageId(), FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+        Header header = new Header(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
         return new Message(header, body);
     }
 
@@ -116,9 +112,7 @@ public class Message
     {
         StringBuilder sbuf = new StringBuilder("");
         String separator = System.getProperty("line.separator");
-        sbuf.append("ID:" + getMessageId())
-        	.append(separator)
-        	.append("FROM:" + getFrom())
+        sbuf.append("FROM:" + getFrom())
         	.append(separator)
         	.append("TYPE:" + getMessageType())
         	.append(separator)

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Mon Feb  7 20:28:10 2011
@@ -28,18 +28,20 @@ public class MessageDeliveryTask impleme
 {
     private static final Logger logger_ = LoggerFactory.getLogger(MessageDeliveryTask.class);    
 
-    private Message message_;
-    private final long constructionTime_ = System.currentTimeMillis();
+    private Message message;
+    private final long constructionTime = System.currentTimeMillis();
+    private final String id;
 
-    public MessageDeliveryTask(Message message)
+    public MessageDeliveryTask(Message message, String id)
     {
         assert message != null;
-        message_ = message;    
+        this.message = message;
+        this.id = id;
     }
     
     public void run()
     { 
-        StorageService.Verb verb = message_.getVerb();
+        StorageService.Verb verb = message.getVerb();
         switch (verb)
         {
             case BINARY:
@@ -48,7 +50,7 @@ public class MessageDeliveryTask impleme
             case RANGE_SLICE:
             case READ_REPAIR:
             case REQUEST_RESPONSE:
-                if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
+                if (System.currentTimeMillis() > constructionTime + DatabaseDescriptor.getRpcTimeout())
                 {
                     MessagingService.instance().incrementDroppedMessages(verb);
                     return;
@@ -67,6 +69,6 @@ public class MessageDeliveryTask impleme
 
         IVerbHandler verbHandler = MessagingService.instance().getVerbHandler(verb);
         assert verbHandler != null : "unknown verb " + verb;
-        verbHandler.doVerb(message_);
+        verbHandler.doVerb(message, id);
     }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Mon Feb  7 20:28:10 2011
@@ -34,8 +34,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,11 +51,10 @@ import org.apache.cassandra.service.Stor
 import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
 import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
 public final class MessagingService implements MessagingServiceMBean
 {
@@ -262,6 +259,13 @@ public final class MessagingService impl
         assert previous == null;
     }
 
+    private static AtomicInteger idGen = new AtomicInteger(0);
+    // TODO make these integers to avoid unnecessary int -> string -> int conversions
+    private static String nextId()
+    {
+        return Integer.toString(idGen.incrementAndGet());
+    }
+
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
@@ -272,12 +276,24 @@ public final class MessagingService impl
      *           suggest that a timeout occurred to the invoker of the send().
      * @return an reference to message id used to match with the result
      */
-    public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
+    public String sendRR(Message message, InetAddress to, IMessageCallback cb)
     {        
-        String messageId = message.getMessageId();
-        addCallback(cb, messageId, to);
-        sendOneWay(message, to);
-        return messageId;
+        String id = nextId();
+        if (logger_.isDebugEnabled())
+            logger_.debug("Sending " + message.getVerb() + " to " + id + "@" + to);
+        addCallback(cb, id, to);
+        sendOneWay(message, id, to);
+        return id;
+    }
+
+    public void sendOneWay(Message message, InetAddress to)
+    {
+        sendOneWay(message, nextId(), to);
+    }
+
+    public void sendReply(Message message, String id, InetAddress to)
+    {
+        sendOneWay(message, id, to);
     }
 
     /**
@@ -286,12 +302,12 @@ public final class MessagingService impl
      * @param message messages to be sent.
      * @param to endpoint to which the message needs to be sent
      */
-    public void sendOneWay(Message message, InetAddress to)
+    private void sendOneWay(Message message, String id, InetAddress to)
     {
         // do local deliveries
         if ( message.getFrom().equals(to) )
         {
-            receive(message);
+            receive(message, id);
             return;
         }
 
@@ -310,6 +326,7 @@ public final class MessagingService impl
         try
         {
             DataOutputBuffer buffer = new DataOutputBuffer();
+            buffer.writeUTF(id);
             Message.serializer().serialize(message, buffer);
             data = buffer.getData();
         }
@@ -327,8 +344,7 @@ public final class MessagingService impl
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
-        addCallback(iar, message.getMessageId(), to);
-        sendOneWay(message, to);
+        sendRR(message, to, iar);
         return iar;
     }
     
@@ -376,13 +392,13 @@ public final class MessagingService impl
         logger_.info("Shutdown complete (no further commands will be processed)");
     }
 
-    public void receive(Message message)
+    public void receive(Message message, String id)
     {
         message = SinkManager.processServerMessage(message);
         if (message == null)
             return;
 
-        Runnable runnable = new MessageDeliveryTask(message);
+        Runnable runnable = new MessageDeliveryTask(message, id);
         ExecutorService stage = StageManager.getStage(message.getMessageType());
         assert stage != null : "No stage for message type " + message.getMessageType();
         stage.execute(runnable);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -19,8 +19,6 @@
 package org.apache.cassandra.net;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,14 +29,13 @@ public class ResponseVerbHandler impleme
 {
     private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class );
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {     
-        String messageId = message.getMessageId();
-        double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(messageId);
-        Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(messageId);
+        double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id);
+        Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(id);
         if (pair == null)
         {
-            logger_.debug("Callback already removed for {}", messageId);
+            logger_.debug("Callback already removed for {}", id);
             return;
         }
 
@@ -48,13 +45,13 @@ public class ResponseVerbHandler impleme
         if (cb instanceof IAsyncCallback)
         {
             if (logger_.isDebugEnabled())
-                logger_.debug("Processing response on a callback from " + message.getMessageId() + "@" + message.getFrom());
+                logger_.debug("Processing response on a callback from " + id + "@" + message.getFrom());
             ((IAsyncCallback) cb).response(message);
         }
         else
         {
             if (logger_.isDebugEnabled())
-                logger_.debug("Processing response on an async result from " + message.getMessageId() + "@" + message.getFrom());
+                logger_.debug("Processing response on an async result from " + id + "@" + message.getFrom());
             ((IAsyncResult) cb).result(message);
         }
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/AntiEntropyService.java Mon Feb  7 20:28:10 2011
@@ -23,7 +23,6 @@ import java.net.InetAddress;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -578,7 +577,7 @@ public class AntiEntropyService
         /**
          * Trigger a validation compaction which will return the tree upon completion.
          */
-        public void doVerb(Message message)
+        public void doVerb(Message message, String id)
         { 
             byte[] bytes = message.getMessageBody();
             
@@ -645,7 +644,7 @@ public class AntiEntropyService
             }
         }
 
-        public void doVerb(Message message)
+        public void doVerb(Message message, String id)
         { 
             byte[] bytes = message.getMessageBody();
             DataInputStream buffer = new DataInputStream(new ByteArrayInputStream(bytes));

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/IndexScanVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -33,7 +33,7 @@ public class IndexScanVerbHandler implem
 {
     private static final Logger logger = LoggerFactory.getLogger(IndexScanVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         try
         {
@@ -43,8 +43,8 @@ public class IndexScanVerbHandler implem
             RangeSliceReply reply = new RangeSliceReply(rows);
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
-                logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+                logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
         catch (Exception ex)
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -35,7 +35,7 @@ public class RangeSliceVerbHandler imple
 
     private static final Logger logger = LoggerFactory.getLogger(RangeSliceVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         try
         {
@@ -52,8 +52,8 @@ public class RangeSliceVerbHandler imple
                                                                           QueryFilter.getFilter(command.predicate, cfs.getComparator())));
             Message response = reply.getReply(message);
             if (logger.isDebugEnabled())
-                logger.debug("Sending " + reply+ " to " + message.getMessageId() + "@" + message.getFrom());
-            MessagingService.instance().sendOneWay(response, message.getFrom());
+                logger.debug("Sending " + reply+ " to " + id + "@" + message.getFrom());
+            MessagingService.instance().sendReply(response, id, message.getFrom());
         }
         catch (Exception ex)
         {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Feb  7 20:28:10 2011
@@ -144,21 +144,6 @@ public class StorageLoadBalancer impleme
         */
     }
 
-    class MoveMessageVerbHandler implements IVerbHandler
-    {
-        public void doVerb(Message message)
-        {
-            Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get() ? 1 : 0)});
-            MessagingService.instance().sendOneWay(reply, message.getFrom());
-            if ( isMoveable_.get() )
-            {
-                // MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
-                /* Start the leave operation and join the ring at the position specified */
-                isMoveable_.set(false);
-            }
-        }
-    }
-
     private static final int BROADCAST_INTERVAL = 60 * 1000;
 
     public static final StorageLoadBalancer instance = new StorageLoadBalancer();

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageProxy.java Mon Feb  7 20:28:10 2011
@@ -131,6 +131,7 @@ public class StorageProxy implements Sto
                 
                 // Multimap that holds onto all the messages and addresses meant for a specific datacenter
                 Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
+                Message unhintedMessage = null;
 
                 for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
                 {
@@ -149,9 +150,10 @@ public class StorageProxy implements Sto
                         else
                         {
                             // belongs on a different server
-                            Message unhintedMessage = rm.makeRowMutationMessage();
+                            if (unhintedMessage == null)
+                                unhintedMessage = rm.makeRowMutationMessage();
                             if (logger.isDebugEnabled())
-                                logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
+                                logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination);
                             
                             Multimap<Message, InetAddress> messages = dcMessages.get(dc);
                             if (messages == null)
@@ -165,7 +167,8 @@ public class StorageProxy implements Sto
                     }
                     else
                     {
-                        // hinted
+                        // hinted messages are unique, so there is no point to adding a hop by forwarding via another node.
+                        // thus, we use sendRR/sendOneWay directly here.
                         Message hintedMessage = rm.makeRowMutationMessage();
                         for (InetAddress target : targets)
                         {
@@ -173,25 +176,14 @@ public class StorageProxy implements Sto
                             {
                                 addHintHeader(hintedMessage, target);
                                 if (logger.isDebugEnabled())
-                                    logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
+                                    logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target);
                             }
                         }
-                        // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
-                        // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
+                        // non-destination hints are part of the callback and count towards consistency only under CL.ANY
                         if (writeEndpoints.contains(destination) || consistency_level == ConsistencyLevel.ANY)
                             MessagingService.instance().sendRR(hintedMessage, destination, responseHandler);
                         else
                             MessagingService.instance().sendOneWay(hintedMessage, destination);
-
-                        Multimap<Message, InetAddress> messages = dcMessages.get(dc);
-                        
-                        if (messages == null)
-                        {
-                           messages = HashMultimap.create();
-                           dcMessages.put(dc, messages);
-                        }
-
-                        messages.put(hintedMessage, destination);
                     }
                 }
 
@@ -229,6 +221,9 @@ public class StorageProxy implements Sto
             for (Map.Entry<Message, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
             {
                 Message message = messages.getKey();
+                // a single message object is used for unhinted writes, so clean out any forwards
+                // from previous loop iterations
+                message.removeHeader(RowMutation.FORWARD_HEADER);
 
                 if (dataCenter.equals(localDataCenter))
                 {
@@ -375,12 +370,13 @@ public class StorageProxy implements Sto
             {
                 Message message = command.makeReadMessage();
                 if (logger.isDebugEnabled())
-                    logger.debug("reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
+                    logger.debug("reading data for " + command + " from " + dataPoint);
                 MessagingService.instance().sendRR(message, dataPoint, handler);
             }
 
             // We lazy-construct the digest Message object since it may not be necessary if we
             // are doing a local digest read, or no digest reads at all.
+            Message digestMessage = null;
             for (InetAddress digestPoint : endpoints.subList(1, endpoints.size()))
             {
                 if (digestPoint.equals(FBUtilities.getLocalAddress()))
@@ -391,9 +387,10 @@ public class StorageProxy implements Sto
                 }
                 else
                 {
-                    Message digestMessage = digestCommand.makeReadMessage();
+                    if (digestMessage == null)
+                        digestMessage = digestCommand.makeReadMessage();
                     if (logger.isDebugEnabled())
-                        logger.debug("reading digest for " + command + " from " + digestMessage.getMessageId() + "@" + digestPoint);
+                        logger.debug("reading digest for " + command + " from " + digestPoint);
                     MessagingService.instance().sendRR(digestMessage, digestPoint, handler);
                 }
             }
@@ -493,11 +490,9 @@ public class StorageProxy implements Sto
     {
         ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
         RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
+        Message messageRepair = command.makeReadMessage();
         for (InetAddress endpoint : endpoints)
-        {
-            Message messageRepair = command.makeReadMessage();
             MessagingService.instance().sendRR(messageRepair, endpoint, handler);
-        }
         return handler;
     }
 
@@ -546,18 +541,18 @@ public class StorageProxy implements Sto
                 {
                     DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
                     RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
+                    Message message = c2.getMessage();
 
                     // collect replies and resolve according to consistency level
                     RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(command.keyspace, liveEndpoints);
                     AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
                     ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
                     // TODO bail early if live endpoints can't satisfy requested consistency level
-                    for (InetAddress endpoint : liveEndpoints) 
+                    for (InetAddress endpoint : liveEndpoints)
                     {
-                        Message message = c2.getMessage();
                         MessagingService.instance().sendRR(message, endpoint, handler);
                         if (logger.isDebugEnabled())
-                            logger.debug("reading " + c2 + " from " + message.getMessageId() + "@" + endpoint);
+                            logger.debug("reading " + c2 + " from " + endpoint);
                     }
                     // TODO read repair on remaining replicas?
 
@@ -811,12 +806,12 @@ public class StorageProxy implements Sto
                 throw new UnavailableException();
             
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
+            Message message = command.getMessage();
             for (InetAddress endpoint : liveEndpoints)
             {
-                Message message = command.getMessage();
                 MessagingService.instance().sendRR(message, endpoint, handler);
                 if (logger.isDebugEnabled())
-                    logger.debug("reading " + command + " from " + message.getMessageId() + "@" + endpoint);
+                    logger.debug("reading " + command + " from " + endpoint);
             }
 
             List<Row> theseRows;
@@ -899,11 +894,9 @@ public class StorageProxy implements Sto
         // Send out the truncate calls and track the responses with the callbacks.
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
         Truncation truncation = new Truncation(keyspace, cfname);
+        Message message = truncation.makeTruncationMessage();
         for (InetAddress endpoint : allEndpoints)
-        {
-            Message message = truncation.makeTruncationMessage();
             MessagingService.instance().sendRR(message, endpoint, responseHandler);
-        }
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -33,12 +33,12 @@ public class ReplicationFinishedVerbHand
 {
     private static Logger logger = LoggerFactory.getLogger(ReplicationFinishedVerbHandler.class);
 
-    public void doVerb(Message msg)
+    public void doVerb(Message msg, String id)
     {
         StorageService.instance.confirmReplication(msg.getFrom());
         Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY);
         if (logger.isDebugEnabled())
-            logger.debug("Replying to " + msg.getMessageId() + "@" + msg.getFrom());
-        MessagingService.instance().sendOneWay(response, msg.getFrom());
+            logger.debug("Replying to " + id + "@" + msg.getFrom());
+        MessagingService.instance().sendReply(response, id, msg.getFrom());
     }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -36,7 +36,7 @@ public class StreamReplyVerbHandler impl
 {
     private static Logger logger = LoggerFactory.getLogger(StreamReplyVerbHandler.class);
 
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         byte[] body = message.getMessageBody();
         ByteArrayInputStream bufIn = new ByteArrayInputStream(body);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/StreamRequestVerbHandler.java Mon Feb  7 20:28:10 2011
@@ -37,7 +37,7 @@ public class StreamRequestVerbHandler im
 {
     private static Logger logger = LoggerFactory.getLogger(StreamRequestVerbHandler.class);
     
-    public void doVerb(Message message)
+    public void doVerb(Message message, String id)
     {
         if (logger.isDebugEnabled())
             logger.debug("Received a StreamRequestMessage from {}", message.getFrom());

Modified: cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RangeSliceCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RangeSliceCommand.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Row.bin
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Row.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RowMutation.bin
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.RowMutation.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceByNamesReadCommand.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceFromReadCommand.bin
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.SliceFromReadCommand.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.

Modified: cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Truncation.bin
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/data/serialization/0.7/db.Truncation.bin?rev=1068101&r1=1068100&r2=1068101&view=diff
==============================================================================
Binary files - no diff available.



Mime
View raw message