cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1057935 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/serv...
Date Tue, 11 Jan 2011 23:41:24 GMT
Author: jbellis
Date: Tue Jan 11 23:41:23 2011
New Revision: 1057935

URL: http://svn.apache.org/viewvc?rev=1057935&view=rev
Log:
merge from 0.7

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java 
 (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props
changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 11 23:41:23 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121
-/cassandra/branches/cassandra-0.7:1026516-1057740
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.7:1026516-1057933
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Tue Jan 11 23:41:23 2011
@@ -23,6 +23,7 @@
  * fix CFMetaData.apply to only compare objects of the same class 
    (CASSANDRA-1962)
  * allow specifying specific SSTables to compact from JMX (CASSANDRA-1963)
+ * fix race condition in MessagingService.targets (CASSANDRA-1959)
 
 
 0.7.0-dev

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 11 23:41:23 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1057740
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1057933
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 11 23:41:23 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1057740
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1057933
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 11 23:41:23 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1057740
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1057933
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 11 23:41:23 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1057740
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1057933
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Jan 11 23:41:23 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1057740
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1057933
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Tue Jan 11 23:41:23
2011
@@ -95,7 +95,7 @@ class AsyncResult implements IAsyncResul
             lock.unlock();
         }        
 
-        MessagingService.removeRegisteredCallback(response.getMessageId());
+        MessagingService.instance().removeRegisteredCallback(response.getMessageId());
     }
 
     public InetAddress getFrom()

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/IncomingTcpConnection.java Tue Jan 11
23:41:23 2011
@@ -88,7 +88,7 @@ public class IncomingTcpConnection exten
                     input.readFully(contentBytes);
                     
                     Message message = Message.serializer().deserialize(new DataInputStream(new
ByteArrayInputStream(contentBytes)));
-                    MessagingService.receive(message);
+                    MessagingService.instance().receive(message);
                 }
                 // prepare to read the next message
                 MessagingService.validateMagic(input.readInt());

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessageDeliveryTask.java Tue Jan 11
23:41:23 2011
@@ -50,7 +50,7 @@ public class MessageDeliveryTask impleme
             case REQUEST_RESPONSE:
                 if (System.currentTimeMillis() > constructionTime_ + DatabaseDescriptor.getRpcTimeout())
                 {
-                    MessagingService.incrementDroppedMessages(verb);
+                    MessagingService.instance().incrementDroppedMessages(verb);
                     return;
                 }
                 break;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Tue Jan 11 23:41:23
2011
@@ -30,6 +30,7 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
 import java.util.*;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -58,37 +59,37 @@ import org.apache.cassandra.utils.Expiri
 import org.apache.cassandra.utils.GuidGenerator;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-public class MessagingService implements MessagingServiceMBean, ILatencyPublisher
+public final class MessagingService implements MessagingServiceMBean, ILatencyPublisher
 {
-    private static int version_ = 1;
+    private static final int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
-    private static SerializerType serializerType_ = SerializerType.BINARY;
+    private SerializerType serializerType_ = SerializerType.BINARY;
 
     /** we preface every message with this number so the recipient can validate the sender
is sane */
-    public static final int PROTOCOL_MAGIC = 0xCA552DFA;
+    private static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private static ExpiringMap<String, IMessageCallback> callbacks;
-    private static Multimap<String, InetAddress> targets;
+    private final ExpiringMap<String, IMessageCallback> callbacks;
+    private final ConcurrentMap<String, Collection<InetAddress>> targets = new
NonBlockingHashMap<String, Collection<InetAddress>>();
 
     /* Lookup table for registering message handlers based on the verb. */
-    private static Map<StorageService.Verb, IVerbHandler> verbHandlers_;
+    private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
     /* Thread pool to handle messaging write activities */
-    private static ExecutorService streamExecutor_;
+    private final ExecutorService streamExecutor_;
     
-    private static NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_
= new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
+    private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_
= new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
     
-    private static Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
-    private static int LOG_DROPPED_INTERVAL_IN_MS = 5000;
+    private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
+    private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
 
     private SocketThread socketThread;
-    private SimpleCondition listenGate;
-    private static final Map<StorageService.Verb, AtomicInteger> droppedMessages =
new EnumMap<StorageService.Verb, AtomicInteger>(StorageService.Verb.class);
+    private final SimpleCondition listenGate;
+    private final Map<StorageService.Verb, AtomicInteger> droppedMessages = new EnumMap<StorageService.Verb,
AtomicInteger>(StorageService.Verb.class);
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
 
-    static
     {
         for (StorageService.Verb verb : StorageService.Verb.values())
             droppedMessages.put(verb, new AtomicInteger());
@@ -103,13 +104,7 @@ public class MessagingService implements
         return MSHandle.instance;
     }
 
-    public Object clone() throws CloneNotSupportedException
-    {
-        //Prevents the singleton from being cloned
-        throw new CloneNotSupportedException();
-    }
-
-    protected MessagingService()
+    private MessagingService()
     {
         listenGate = new SimpleCondition();
         verbHandlers_ = new EnumMap<StorageService.Verb, IVerbHandler>(StorageService.Verb.class);
@@ -127,7 +122,7 @@ public class MessagingService implements
         {
             public Object apply(String messageId)
             {
-                Collection<InetAddress> addresses = targets.removeAll(messageId);
+                Collection<InetAddress> addresses = targets.remove(messageId);
                 if (addresses == null)
                     return null;
 
@@ -140,7 +135,6 @@ public class MessagingService implements
                 return null;
             }
         };
-        targets = ArrayListMultimap.create();
         callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()),
timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
@@ -154,7 +148,7 @@ public class MessagingService implements
         }
     }
 
-    public byte[] hash(String type, byte data[])
+    public static byte[] hash(String type, byte data[])
     {
         byte result[];
         try
@@ -203,7 +197,7 @@ public class MessagingService implements
         }
     }
 
-    public static OutboundTcpConnectionPool getConnectionPool(InetAddress to)
+    public OutboundTcpConnectionPool getConnectionPool(InetAddress to)
     {
         OutboundTcpConnectionPool cp = connectionManagers_.get(to);
         if (cp == null)
@@ -214,7 +208,7 @@ public class MessagingService implements
         return cp;
     }
 
-    public static OutboundTcpConnection getConnection(InetAddress to, Message msg)
+    public OutboundTcpConnection getConnection(InetAddress to, Message msg)
     {
         return getConnectionPool(to).getConnection(msg);
     }
@@ -255,12 +249,33 @@ public class MessagingService implements
         addCallback(cb, messageId);
         for (InetAddress endpoint : to)
         {
-            targets.put(messageId, endpoint);
+            putTarget(messageId, endpoint);
             sendOneWay(message, endpoint);
         }
         return messageId;
     }
 
+    private void putTarget(String messageId, InetAddress endpoint)
+    {
+        Collection<InetAddress> addresses = targets.get(messageId);
+        if (addresses == null)
+        {
+            addresses = new NonBlockingHashSet<InetAddress>();
+            Collection<InetAddress> oldAddresses = targets.putIfAbsent(messageId, addresses);
+            if (oldAddresses != null)
+                addresses = oldAddresses;
+        }
+        addresses.add(endpoint);
+    }
+
+    private void removeTarget(String messageId, InetAddress from)
+    {
+        Collection<InetAddress> addresses = targets.get(messageId);
+        // null is expected if we removed the callback or we got a reply after its timeout
expired
+        if (addresses != null)
+            addresses.remove(from);
+    }
+
     public void addCallback(IAsyncCallback cb, String messageId)
     {
         callbacks.put(messageId, cb);
@@ -280,7 +295,7 @@ public class MessagingService implements
     {        
         String messageId = message.getMessageId();
         addCallback(cb, messageId);
-        targets.put(messageId, to);
+        putTarget(messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
@@ -307,7 +322,7 @@ public class MessagingService implements
         for ( int i = 0; i < messages.length; ++i )
         {
             messages[i].setMessageId(groupId);
-            targets.put(groupId, to.get(i));
+            putTarget(groupId, to.get(i));
             sendOneWay(messages[i], to.get(i));
         }
         return groupId;
@@ -324,7 +339,7 @@ public class MessagingService implements
         // do local deliveries
         if ( message.getFrom().equals(to) )
         {
-            MessagingService.receive(message);
+            receive(message);
             return;
         }
 
@@ -361,7 +376,7 @@ public class MessagingService implements
     {
         IAsyncResult iar = new AsyncResult();
         callbacks.put(message.getMessageId(), iar);
-        targets.put(message.getMessageId(), to);
+        putTarget(message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
@@ -385,19 +400,19 @@ public class MessagingService implements
     }
 
     /** blocks until the processing pools are empty and done. */
-    public static void waitFor() throws InterruptedException
+    public void waitFor() throws InterruptedException
     {
         while (!streamExecutor_.isTerminated())
             streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
     }
 
-    public static void shutdown()
+    public void shutdown()
     {
         logger_.info("Shutting down MessageService...");
 
         try
         {
-            instance().socketThread.close();
+            socketThread.close();
         }
         catch (IOException e)
         {
@@ -410,7 +425,7 @@ public class MessagingService implements
         logger_.info("Shutdown complete (no further commands will be processed)");
     }
 
-    public static void receive(Message message)
+    public void receive(Message message)
     {
         message = SinkManager.processServerMessage(message);
         if (message == null)
@@ -422,25 +437,25 @@ public class MessagingService implements
         stage.execute(runnable);
     }
 
-    public static IMessageCallback getRegisteredCallback(String messageId)
+    public IMessageCallback getRegisteredCallback(String messageId)
     {
         return callbacks.get(messageId);
     }
     
-    public static IMessageCallback removeRegisteredCallback(String messageId)
+    public IMessageCallback removeRegisteredCallback(String messageId)
     {
-        targets.removeAll(messageId); // TODO fix this when we clean up quorum reads to do
proper RR
+        targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper
RR
         return callbacks.remove(messageId);
     }
 
-    public static long getRegisteredCallbackAge(String messageId)
+    public long getRegisteredCallbackAge(String messageId)
     {
         return callbacks.getAge(messageId);
     }
 
-    public static void responseReceivedFrom(String messageId, InetAddress from)
+    public void responseReceivedFrom(String messageId, InetAddress from)
     {
-        targets.remove(messageId, from);
+        removeTarget(messageId, from);
     }
 
     public static void validateMagic(int magic) throws IOException
@@ -454,7 +469,7 @@ public class MessagingService implements
         return x >>> (p + 1) - n & ~(-1 << n);
     }
         
-    public static ByteBuffer packIt(byte[] bytes, boolean compress)
+    public ByteBuffer packIt(byte[] bytes, boolean compress)
     {
         /*
              Setting up the protocol header. This is 4 bytes long
@@ -484,7 +499,7 @@ public class MessagingService implements
         return buffer;
     }
         
-    public static ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
+    public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
     {
         /* 
         Setting up the protocol header. This is 4 bytes long
@@ -535,12 +550,12 @@ public class MessagingService implements
         return buffer;
     }
 
-    public static int incrementDroppedMessages(StorageService.Verb verb)
+    public int incrementDroppedMessages(StorageService.Verb verb)
     {
         return droppedMessages.get(verb).incrementAndGet();
     }
                
-    private static void logDroppedMessages()
+    private void logDroppedMessages()
     {
         boolean logTpstats = false;
         for (Map.Entry<StorageService.Verb, AtomicInteger> entry : droppedMessages.entrySet())

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Tue Jan 11
23:41:23 2011
@@ -37,9 +37,9 @@ public class ResponseVerbHandler impleme
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();
-        MessagingService.responseReceivedFrom(messageId, message.getFrom());
-        double age = System.currentTimeMillis() - MessagingService.getRegisteredCallbackAge(messageId);
-        IMessageCallback cb = MessagingService.getRegisteredCallback(messageId);
+        MessagingService.instance().responseReceivedFrom(messageId, message.getFrom());
+        double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(messageId);
+        IMessageCallback cb = MessagingService.instance().getRegisteredCallback(messageId);
         if (cb == null)
             return;
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/ReadCallback.java Tue Jan 11 23:41:23
2011
@@ -83,9 +83,10 @@ public class ReadCallback<T> implements 
 
     public void close()
     {
+        MessagingService ms = MessagingService.instance();
         for (Message response : resolver.getMessages())
         {
-            MessagingService.removeRegisteredCallback(response.getMessageId());
+            ms.removeRegisteredCallback(response.getMessageId());
         }
     }
     

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Tue Jan 11 23:41:23
2011
@@ -276,7 +276,7 @@ public class StorageService implements I
         Gossiper.instance.unregister(migrationManager);
         Gossiper.instance.unregister(this);
         Gossiper.instance.stop();
-        MessagingService.shutdown();
+        MessagingService.instance().shutdown();
         StageManager.shutdownNow();
     }
     
@@ -1500,7 +1500,7 @@ public class StorageService implements I
             public void run()
             {
                 Gossiper.instance.stop();
-                MessagingService.shutdown();
+                MessagingService.instance().shutdown();
                 StageManager.shutdownNow();
                 setMode("Decommissioned", true);
                 // let op be responsible for killing the process
@@ -1797,9 +1797,9 @@ public class StorageService implements I
         setMode("Starting drain process", true);
         Gossiper.instance.stop();
         setMode("Draining: shutting down MessageService", false);
-        MessagingService.shutdown();
+        MessagingService.instance().shutdown();
         setMode("Draining: emptying MessageService pools", false);
-        MessagingService.waitFor();
+        MessagingService.instance().waitFor();
 
         setMode("Draining: clearing mutation stage", false);
         mutationStage.shutdown();

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStreamTask.java Tue Jan 11
23:41:23 2011
@@ -84,7 +84,7 @@ public class FileStreamTask extends Wrap
 
     private void stream(SocketChannel channel) throws IOException
     {
-        ByteBuffer buffer = MessagingService.constructStreamHeader(header, false);
+        ByteBuffer buffer = MessagingService.instance().constructStreamHeader(header, false);
         channel.write(buffer);
         assert buffer.remaining() == 0;
         if (header.file == null)

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1057935&r1=1057934&r2=1057935&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Tue Jan 11 23:41:23
2011
@@ -84,7 +84,7 @@ public class RemoveTest extends CleanupH
     public void tearDown()
     {
         SinkManager.clear();
-        MessagingService.shutdown();
+        MessagingService.instance().shutdown();
         ss.setPartitionerUnsafe(oldPartitioner);
     }
 



Mime
View raw message