cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1029224 - in /cassandra/branches/cassandra-0.7: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src...
Date Sat, 30 Oct 2010 23:48:21 GMT
Author: jbellis
Date: Sat Oct 30 23:48:20 2010
New Revision: 1029224

URL: http://svn.apache.org/viewvc?rev=1029224&view=rev
Log:
add INTERNAL_RESPONSE verb to differentiate from responses related to client requests.  patch
by jbellis; reviewed by Stu Hood for CASSANDRA-1685

Modified:
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Sat Oct 30 23:48:20 2010
@@ -6,6 +6,8 @@ dev
  * fix IntegerType.getString (CASSANDRA-1681)
  * log tpstats when dropping messages (CASSANDRA-1660)
  * make -Djava.net.preferIPv4Stack=true the default (CASSANDRA-628)
+ * add INTERNAL_RESPONSE verb to differentiate from responses related
+   to client requests (CASSANDRA-1685)
 
 
 0.7.0-beta3

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/Stage.java Sat
Oct 30 23:48:20 2010
@@ -27,10 +27,11 @@ public enum Stage
     MUTATION,
     STREAM,
     GOSSIP,
-    RESPONSE,
+    REQUEST_RESPONSE,
     ANTIENTROPY,
     MIGRATION,
-    MISC;
+    MISC,
+    INTERNAL_RESPONSE;
 
     public String getJmxType()
     {
@@ -41,10 +42,11 @@ public enum Stage
             case MIGRATION:
             case MISC:
             case STREAM:
+            case INTERNAL_RESPONSE:
                 return "internal";
             case MUTATION:
             case READ:
-            case RESPONSE:
+            case REQUEST_RESPONSE:
                 return "request";
             default:
                 throw new AssertionError("Unknown stage " + this);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/concurrent/StageManager.java
Sat Oct 30 23:48:20 2010
@@ -42,7 +42,8 @@ public class StageManager
     {
         stages.put(Stage.MUTATION, multiThreadedConfigurableStage(Stage.MUTATION, getConcurrentWriters()));
         stages.put(Stage.READ, multiThreadedConfigurableStage(Stage.READ, getConcurrentReaders()));
       
-        stages.put(Stage.RESPONSE, multiThreadedStage(Stage.RESPONSE, Math.max(2, Runtime.getRuntime().availableProcessors())));
+        stages.put(Stage.REQUEST_RESPONSE, multiThreadedStage(Stage.REQUEST_RESPONSE, Math.max(2,
Runtime.getRuntime().availableProcessors())));
+        stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, Math.max(1,
Runtime.getRuntime().availableProcessors())));
         // the rest are all single-threaded
         stages.put(Stage.STREAM, new JMXEnabledThreadPoolExecutor(Stage.STREAM));
         stages.put(Stage.GOSSIP, new JMXEnabledThreadPoolExecutor(Stage.GOSSIP));

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/SchemaCheckVerbHandler.java
Sat Oct 30 23:48:20 2010
@@ -31,11 +31,10 @@ public class SchemaCheckVerbHandler impl
 {
     private final Logger logger = LoggerFactory.getLogger(SchemaCheckVerbHandler.class);
     
-    @Override
     public void doVerb(Message message)
     {
         logger.debug("Received schema check request.");
-        Message response = message.getReply(FBUtilities.getLocalAddress(), DatabaseDescriptor.getDefsVersion().toString().getBytes());
+        Message response = message.getInternalReply(DatabaseDescriptor.getDefsVersion().toString().getBytes());
         MessagingService.instance.sendOneWay(response, message.getFrom());
     }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/dht/BootStrapper.java Sat
Oct 30 23:48:20 2010
@@ -254,7 +254,7 @@ public class BootStrapper
         {
             StorageService ss = StorageService.instance;
             String tokenString = StorageService.getPartitioner().getTokenFactory().toString(ss.getBootstrapToken());
-            Message response = message.getReply(FBUtilities.getLocalAddress(), tokenString.getBytes(Charsets.UTF_8));
+            Message response = message.getInternalReply(tokenString.getBytes(Charsets.UTF_8));
             MessagingService.instance.sendOneWay(response, message.getFrom());
         }
     }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
Sat Oct 30 23:48:20 2010
@@ -29,7 +29,6 @@ import java.lang.management.ManagementFa
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.AbstractStatsDeque;
@@ -181,7 +180,7 @@ public class DynamicEndpointSnitch exten
     {
         if (!registered)
         {
-       	    ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.READ_RESPONSE);
+       	    ILatencyPublisher handler = (ILatencyPublisher)MessagingService.instance.getVerbHandler(StorageService.Verb.REQUEST_RESPONSE);
             if (handler != null)
             {
                 handler.register(this);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/Message.java Sat Oct
30 23:48:20 2010
@@ -26,6 +26,7 @@ import java.net.InetAddress;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class Message
 {
@@ -101,10 +102,16 @@ public class Message
     // TODO should take byte[] + length so we don't have to copy to a byte[] of exactly the
right len
     public Message getReply(InetAddress from, byte[] args)
     {
-        Header header = new Header(getMessageId(), from, StorageService.Verb.READ_RESPONSE);
+        Header header = new Header(getMessageId(), from, StorageService.Verb.REQUEST_RESPONSE);
         return new Message(header, args);
     }
-    
+
+    public Message getInternalReply(byte[] body)
+    {
+        Header header = new Header(getMessageId(), FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE);
+        return new Message(header, body);
+    }
+
     public String toString()
     {
         StringBuilder sbuf = new StringBuilder("");

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
Sat Oct 30 23:48:20 2010
@@ -42,7 +42,9 @@ class OutboundTcpConnectionPool
     OutboundTcpConnection getConnection(Message msg)
     {
         Stage stage = msg.getMessageType();
-        return stage == Stage.RESPONSE || stage == Stage.GOSSIP ? ackCon : cmdCon;
+        return stage == Stage.REQUEST_RESPONSE || stage == Stage.INTERNAL_RESPONSE || stage
== Stage.GOSSIP
+               ? ackCon
+               : cmdCon;
     }
 
     synchronized void reset()

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/ConsistencyChecker.java
Sat Oct 30 23:48:20 2010
@@ -23,7 +23,6 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -33,8 +32,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
 
-import org.apache.cassandra.concurrent.Stage;
-import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ReadCommand;
@@ -166,7 +163,7 @@ class ConsistencyChecker implements Runn
             ReadResponse.serializer().serialize(readResponse, out);
             byte[] bytes = new byte[out.getLength()];
             System.arraycopy(out.getData(), 0, bytes, 0, bytes.length);
-            responses_.add(new Message(FBUtilities.getLocalAddress(), StorageService.Verb.READ_RESPONSE,
bytes));
+            responses_.add(new Message(FBUtilities.getLocalAddress(), StorageService.Verb.INTERNAL_RESPONSE,
bytes));
         }
 
         // synchronized so the " == majority" is safe

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Sat Oct 30 23:48:20 2010
@@ -148,7 +148,7 @@ public class StorageLoadBalancer impleme
     {
         public void doVerb(Message message)
         {
-            Message reply = message.getReply(FBUtilities.getLocalAddress(), new byte[] {(byte)(isMoveable_.get()
? 1 : 0)});
+            Message reply = message.getInternalReply(new byte[] {(byte)(isMoveable_.get()
? 1 : 0)});
             MessagingService.instance.sendOneWay(reply, message.getFrom());
             if ( isMoveable_.get() )
             {

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Sat Oct 30 23:48:20 2010
@@ -145,7 +145,7 @@ public class StorageService implements I
         BINARY,
         READ_REPAIR,
         READ,
-        READ_RESPONSE,
+        REQUEST_RESPONSE, // client-initiated reads and writes
         STREAM_INITIATE, // Deprecated
         STREAM_INITIATE_DONE, // Deprecated
         STREAM_REPLY,
@@ -164,6 +164,7 @@ public class StorageService implements I
         SCHEMA_CHECK,
         INDEX_SCAN,
         REPLICATION_FINISHED,
+        INTERNAL_RESPONSE, // responses to internal calls
         ;
         // remember to add new verbs at the end, since we serialize by ordinal
     }
@@ -175,7 +176,7 @@ public class StorageService implements I
         put(Verb.BINARY, Stage.MUTATION);
         put(Verb.READ_REPAIR, Stage.MUTATION);
         put(Verb.READ, Stage.READ);
-        put(Verb.READ_RESPONSE, Stage.RESPONSE);
+        put(Verb.REQUEST_RESPONSE, Stage.REQUEST_RESPONSE);
         put(Verb.STREAM_REPLY, Stage.MISC); // TODO does this really belong on misc? I've
just copied old behavior here
         put(Verb.STREAM_REQUEST, Stage.STREAM);
         put(Verb.RANGE_SLICE, Stage.READ);
@@ -191,6 +192,7 @@ public class StorageService implements I
         put(Verb.SCHEMA_CHECK, Stage.MIGRATION);
         put(Verb.INDEX_SCAN, Stage.READ);
         put(Verb.REPLICATION_FINISHED, Stage.MISC);
+        put(Verb.INTERNAL_RESPONSE, Stage.INTERNAL_RESPONSE);
     }};
 
 
@@ -284,7 +286,8 @@ public class StorageService implements I
         MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler()
);
         MessagingService.instance.registerVerbHandlers(Verb.STREAM_REPLY, new StreamReplyVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.REPLICATION_FINISHED, new ReplicationFinishedVerbHandler());
-        MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new ResponseVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.REQUEST_RESPONSE, new ResponseVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.INTERNAL_RESPONSE, new ResponseVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler());
 

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java?rev=1029224&r1=1029223&r2=1029224&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/streaming/ReplicationFinishedVerbHandler.java
Sat Oct 30 23:48:20 2010
@@ -20,6 +20,7 @@ package org.apache.cassandra.streaming;
  * 
  */
 
+import org.apache.commons.lang.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,7 @@ public class ReplicationFinishedVerbHand
     public void doVerb(Message msg)
     {
         StorageService.instance.confirmReplication(msg.getFrom());
-        Message response = msg.getReply(FBUtilities.getLocalAddress(), new byte[]{});
+        Message response = msg.getInternalReply(ArrayUtils.EMPTY_BYTE_ARRAY);
         if (logger.isDebugEnabled())
             logger.debug("Replying to " + msg.getMessageId() + "@" + msg.getFrom());
         MessagingService.instance.sendOneWay(response, msg.getFrom());



Mime
View raw message