cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1064763 - in /cassandra/trunk: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/hadoop/ src/java/org/apache/cassandra/io/ src/java/...
Date Fri, 28 Jan 2011 16:27:03 GMT
Author: jbellis
Date: Fri Jan 28 16:27:02 2011
New Revision: 1064763

URL: http://svn.apache.org/viewvc?rev=1064763&view=rev
Log:
merge from 0.7

Removed:
    cassandra/trunk/src/java/org/apache/cassandra/locator/ILatencyPublisher.java
Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java   (props changed)
    cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java
    cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
    cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 28 16:27:02 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7:1026516-1064181,1064342
+/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932,1064193
+/cassandra/branches/cassandra-0.7:1026516-1064342
 /cassandra/branches/cassandra-0.7.0:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jan 28 16:27:02 2011
@@ -13,6 +13,9 @@
  * add JVM shutdownhook to sync commitlog (CASSANDRA-1919)
  * allow nodes to be up without being part of  normal traffic (CASSANDRA-1951)
  * fix CLI "show keyspaces" with null options on NTS (CASSANDRA-2049)
+ * fix possible ByteBuffer race conditions (CASSANDRA-2066)
+ * reduce garbage generated by MessagingService to prevent load spikes
+   (CASSANDRA-2058)
 
 
 0.7.1

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 28 16:27:02 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1064181,1064342
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932,1064193
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1064342
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 28 16:27:02 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1064181,1064342
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932,1064193
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1064342
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 28 16:27:02 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1064181,1064342
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932,1064193
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1064342
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 28 16:27:02 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1064181,1064342
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932,1064193
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1064342
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 28 16:27:02 2011
@@ -1,5 +1,5 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932
-/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1064181,1064342
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1055311,1056121,1057932,1064193
+/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1064342
 /cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Fri Jan 28 16:27:02 2011
@@ -124,7 +124,7 @@ public class HintedHandOffManager
             rm.add(cf);
             Message message = rm.makeRowMutationMessage();
             IWriteResponseHandler responseHandler =  WriteResponseHandler.create(endpoint);
-            MessagingService.instance().sendRR(message, Arrays.asList(endpoint), responseHandler);
+            MessagingService.instance().sendRR(message, endpoint, responseHandler);
             try
             {
                 responseHandler.get();

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/RandomPartitioner.java Fri Jan 28 16:27:02 2011
@@ -98,11 +98,7 @@ public class RandomPartitioner implement
 
         public Token<BigInteger> fromByteArray(ByteBuffer bytes)
         {
-            byte[] b = new byte[bytes.remaining()];
-            bytes.get(b);
-            bytes.rewind();
-            
-            return new BigIntegerToken(new BigInteger(b));
+            return new BigIntegerToken(new BigInteger(ByteBufferUtil.getArray(bytes)));
         }
 
         public String toString(Token<BigInteger> bigIntegerToken)

Modified: cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java Fri Jan 28 16:27:02 2011
@@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TSocket;
 
-import static org.apache.cassandra.io.SerDeUtils.copy;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
@@ -169,7 +169,7 @@ implements org.apache.hadoop.mapred.Reco
             org.apache.cassandra.hadoop.avro.SlicePredicate apred = amut.deletion.predicate;
             if (amut.deletion.super_column != null)
                 // super column
-                deletion.setSuper_column(copy(amut.deletion.super_column));
+                deletion.setSuper_column(ByteBufferUtil.getArray(amut.deletion.super_column));
             else if (apred.column_names != null)
             {
                 // column names

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/SerDeUtils.java Fri Jan 28 16:27:02 2011
@@ -45,14 +45,6 @@ public final class SerDeUtils
     // unbuffered decoders
     private final static DecoderFactory DIRECT_DECODERS = new DecoderFactory().configureDirectDecoder(true);
 
-    public static byte[] copy(ByteBuffer buff)
-    {
-        byte[] bytes = new byte[buff.remaining()];
-        buff.get(bytes);
-        buff.rewind();
-        return bytes;
-    }
-
 	/**
      * Deserializes a single object based on the given Schema.
      * @param writer writer's schema

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java Fri Jan 28 16:27:02 2011
@@ -208,10 +208,9 @@ public class DynamicEndpointSnitch exten
             return;
         if (!registered)
         {
-       	    ILatencyPublisher handler = (ILatencyPublisher) MessagingService.instance().getVerbHandler(StorageService.Verb.REQUEST_RESPONSE);
-            if (handler != null)
+            if (MessagingService.instance() != null)
             {
-                handler.register(this);
+                MessagingService.instance().register(this);
                 registered = true;
             }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Fri Jan 28 16:27:02 2011
@@ -94,8 +94,6 @@ class AsyncResult implements IAsyncResul
         {
             lock.unlock();
         }        
-
-        MessagingService.instance().removeRegisteredCallback(response.getMessageId());
     }
 
     public InetAddress getFrom()

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jan 28 16:27:02 2011
@@ -27,7 +27,6 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
 import java.util.*;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,8 +34,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.base.Function;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,23 +43,22 @@ import org.apache.cassandra.config.Confi
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.locator.ILatencyPublisher;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
-import org.apache.cassandra.service.GCInspector;
 import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.security.streaming.SSLFileStreamTask;
+import org.apache.cassandra.service.GCInspector;
+import org.apache.cassandra.service.ReadCallback;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
 import org.apache.cassandra.utils.ExpiringMap;
-import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SimpleCondition;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.cliffc.high_scale_lib.NonBlockingHashSet;
 
-public final class MessagingService implements MessagingServiceMBean, ILatencyPublisher
+public final class MessagingService implements MessagingServiceMBean
 {
     private static final int version_ = 1;
     //TODO: make this parameter dynamic somehow.  Not sure if config is appropriate.
@@ -72,17 +68,16 @@ public final class MessagingService impl
     private static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private final ExpiringMap<String, IMessageCallback> callbacks;
-    private final ConcurrentMap<String, Collection<InetAddress>> targets = new NonBlockingHashMap<String, Collection<InetAddress>>();
+    private final ExpiringMap<String, Pair<InetAddress, IMessageCallback>> callbacks;
 
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
 
     /* Thread pool to handle messaging write activities */
     private final ExecutorService streamExecutor_;
-    
+
     private final NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool> connectionManagers_ = new NonBlockingHashMap<InetAddress, OutboundTcpConnectionPool>();
-    
+
     private static final Logger logger_ = LoggerFactory.getLogger(MessagingService.class);
     private static final int LOG_DROPPED_INTERVAL_IN_MS = 5000;
 
@@ -119,24 +114,16 @@ public final class MessagingService impl
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
-        Function<String, ?> timeoutReporter = new Function<String, Object>()
+        Function<Pair<String, Pair<InetAddress, IMessageCallback>>, ?> timeoutReporter = new Function<Pair<String, Pair<InetAddress, IMessageCallback>>, Object>()
         {
-            public Object apply(String messageId)
+            public Object apply(Pair<String, Pair<InetAddress, IMessageCallback>> pair)
             {
-                Collection<InetAddress> addresses = targets.remove(messageId);
-                if (addresses == null)
-                    return null;
-
-                for (InetAddress address : addresses)
-                {
-                    for (ILatencySubscriber subscriber : subscribers)
-                        subscriber.receiveTiming(address, (double) DatabaseDescriptor.getRpcTimeout());
-                }
-
+                Pair<InetAddress, IMessageCallback> expiredValue = pair.right;
+                maybeAddLatency(expiredValue.right, expiredValue.left, (double) DatabaseDescriptor.getRpcTimeout());
                 return null;
             }
         };
-        callbacks = new ExpiringMap<String, IMessageCallback>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
+        callbacks = new ExpiringMap<String, Pair<InetAddress, IMessageCallback>>((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -149,6 +136,24 @@ public final class MessagingService impl
         }
     }
 
+    /**
+     * Track latency information for the dynamic snitch
+     * @param cb: the callback associated with this message -- this lets us know if it's a message type we're interested in
+     * @param address: the host that replied to the message
+     * @param latency
+     */
+    public void maybeAddLatency(IMessageCallback cb, InetAddress address, double latency)
+    {
+        if (cb instanceof ReadCallback || cb instanceof AsyncResult)
+            addLatency(address, latency);
+    }
+
+    public void addLatency(InetAddress address, double latency)
+    {
+        for (ILatencySubscriber subscriber : subscribers)
+            subscriber.receiveTiming(address, latency);
+    }
+
     public static byte[] hash(String type, byte data[])
     {
         byte result[];
@@ -242,7 +247,7 @@ public final class MessagingService impl
     {
         return getConnectionPool(to).getConnection(msg);
     }
-        
+
     /**
      * Register a verb and the corresponding verb handler with the
      * Messaging Service.
@@ -254,7 +259,7 @@ public final class MessagingService impl
     	assert !verbHandlers_.containsKey(verb);
     	verbHandlers_.put(verb, verbHandler);
     }
-        
+
     /**
      * This method returns the verb handler associated with the registered
      * verb. If no handler has been registered then null is returned.
@@ -266,49 +271,9 @@ public final class MessagingService impl
         return verbHandlers_.get(type);
     }
 
-    /**
-     * Send a message to a given endpoint.
-     * @param message message to be sent.
-     * @param to endpoint to which the message needs to be sent
-     * @return an reference to an IAsyncResult which can be queried for the
-     * response
-     */
-    public String sendRR(Message message, Collection<InetAddress> to, IAsyncCallback cb)
-    {
-        String messageId = message.getMessageId();
-        addCallback(cb, messageId);
-        for (InetAddress endpoint : to)
-        {
-            putTarget(messageId, endpoint);
-            sendOneWay(message, endpoint);
-        }
-        return messageId;
-    }
-
-    private void putTarget(String messageId, InetAddress endpoint)
+    private void addCallback(IMessageCallback cb, String messageId, InetAddress to)
     {
-        Collection<InetAddress> addresses = targets.get(messageId);
-        if (addresses == null)
-        {
-            addresses = new NonBlockingHashSet<InetAddress>();
-            Collection<InetAddress> oldAddresses = targets.putIfAbsent(messageId, addresses);
-            if (oldAddresses != null)
-                addresses = oldAddresses;
-        }
-        addresses.add(endpoint);
-    }
-
-    private void removeTarget(String messageId, InetAddress from)
-    {
-        Collection<InetAddress> addresses = targets.get(messageId);
-        // null is expected if we removed the callback or we got a reply after its timeout expired
-        if (addresses != null)
-            addresses.remove(from);
-    }
-
-    public void addCallback(IAsyncCallback cb, String messageId)
-    {
-        callbacks.put(messageId, cb);
+        callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb));
     }
 
     /**
@@ -322,10 +287,9 @@ public final class MessagingService impl
      * @return an reference to message id used to match with the result
      */
     public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
-    {        
+    {
         String messageId = message.getMessageId();
-        addCallback(cb, messageId);
-        putTarget(messageId, to);
+        addCallback(cb, messageId, to);
         sendOneWay(message, to);
         return messageId;
     }
@@ -373,16 +337,15 @@ public final class MessagingService impl
         // write it
         connection.write(buffer);
     }
-    
+
     public IAsyncResult sendRR(Message message, InetAddress to)
     {
         IAsyncResult iar = new AsyncResult();
-        callbacks.put(message.getMessageId(), iar);
-        putTarget(message.getMessageId(), to);
+        addCallback(iar, message.getMessageId(), to);
         sendOneWay(message, to);
         return iar;
     }
-    
+
     /**
      * Stream a file from source to destination. This is highly optimized
      * to not hold any of the contents of the file in memory.
@@ -398,7 +361,7 @@ public final class MessagingService impl
         else
             streamExecutor_.execute(new FileStreamTask(header, to));
     }
-    
+
     public void register(ILatencySubscriber subcriber)
     {
         subscribers.add(subcriber);
@@ -442,14 +405,8 @@ public final class MessagingService impl
         stage.execute(runnable);
     }
 
-    public IMessageCallback getRegisteredCallback(String messageId)
-    {
-        return callbacks.get(messageId);
-    }
-    
-    public IMessageCallback removeRegisteredCallback(String messageId)
+    public Pair<InetAddress, IMessageCallback> removeRegisteredCallback(String messageId)
     {
-        targets.remove(messageId);
         return callbacks.remove(messageId);
     }
 
@@ -458,11 +415,6 @@ public final class MessagingService impl
         return callbacks.getAge(messageId);
     }
 
-    public void responseReceivedFrom(String messageId, InetAddress from)
-    {
-        removeTarget(messageId, from);
-    }
-
     public static void validateMagic(int magic) throws IOException
     {
         if (magic != PROTOCOL_MAGIC)
@@ -473,7 +425,7 @@ public final class MessagingService impl
     {
         return x >>> (p + 1) - n & ~(-1 << n);
     }
-        
+
     public ByteBuffer packIt(byte[] bytes, boolean compress)
     {
         /*
@@ -483,8 +435,8 @@ public final class MessagingService impl
              is turned on or off. It is turned off by default. The 4th
              bit indicates if we are in streaming mode. It is turned off
              by default. The 5th-8th bits are reserved for future use.
-             The next 8 bits indicate a version number. Remaining 15 bits 
-             are not used currently.            
+             The next 8 bits indicate a version number. Remaining 15 bits
+             are not used currently.
         */
         int header = 0;
         // Setting up the serializer bit
@@ -503,18 +455,18 @@ public final class MessagingService impl
         buffer.flip();
         return buffer;
     }
-        
+
     public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress)
     {
-        /* 
+        /*
         Setting up the protocol header. This is 4 bytes long
         represented as an integer. The first 2 bits indicate
         the serializer type. The 3rd bit indicates if compression
         is turned on or off. It is turned off by default. The 4th
         bit indicates if we are in streaming mode. It is turned off
-        by default. The following 4 bits are reserved for future use. 
-        The next 8 bits indicate a version number. Remaining 15 bits 
-        are not used currently.            
+        by default. The following 4 bits are reserved for future use.
+        The next 8 bits indicate a version number. Remaining 15 bits
+        are not used currently.
         */
         int header = 0;
         // Setting up the serializer bit
@@ -559,7 +511,7 @@ public final class MessagingService impl
     {
         return droppedMessages.get(verb).incrementAndGet();
     }
-               
+
     private void logDroppedMessages()
     {
         boolean logTpstats = false;
@@ -582,7 +534,7 @@ public final class MessagingService impl
     private static class SocketThread extends Thread
     {
         private final ServerSocket server;
-        
+
         SocketThread(ServerSocket server, String name)
         {
             super(name);
@@ -610,7 +562,7 @@ public final class MessagingService impl
                 }
             }
         }
-        
+
         void close() throws IOException
         {
             server.close();

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Fri Jan 28 16:27:02 2011
@@ -25,27 +25,22 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.locator.ILatencyPublisher;
-import org.apache.cassandra.locator.ILatencySubscriber;
+import org.apache.cassandra.utils.Pair;
 
-public class ResponseVerbHandler implements IVerbHandler, ILatencyPublisher
+public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class );
-    private List<ILatencySubscriber>  subscribers = new ArrayList<ILatencySubscriber>();
-
 
     public void doVerb(Message message)
     {     
         String messageId = message.getMessageId();
-        MessagingService.instance().responseReceivedFrom(messageId, message.getFrom());
         double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(messageId);
-        IMessageCallback cb = MessagingService.instance().getRegisteredCallback(messageId);
-        if (cb == null)
+        Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(messageId);
+        if (pair == null)
             return;
 
-        // if cb is not null, then age will be valid
-        for (ILatencySubscriber subscriber : subscribers)
-            subscriber.receiveTiming(message.getFrom(), age);
+        IMessageCallback cb = pair.right;
+        MessagingService.instance().maybeAddLatency(cb, message.getFrom(), age);
 
         if (cb instanceof IAsyncCallback)
         {
@@ -60,9 +55,4 @@ public class ResponseVerbHandler impleme
             ((IAsyncResult) cb).result(message);
         }
     }
-
-    public void register(ILatencySubscriber subscriber)
-    {
-        subscribers.add(subscriber);
-    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java Fri Jan 28 16:27:02 2011
@@ -70,13 +70,6 @@ public abstract class AbstractWriteRespo
         }
     }
 
-    public void addHintCallback(Message hintedMessage, InetAddress destination)
-    {
-        // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
-        if (writeEndpoints.contains(destination) || consistencyLevel == ConsistencyLevel.ANY)
-            MessagingService.instance().addCallback(this, hintedMessage.getMessageId());
-    }
-
     /** null message means "response from local write" */
     public abstract void response(Message msg);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java Fri Jan 28 16:27:02 2011
@@ -20,16 +20,13 @@ package org.apache.cassandra.service;
  *
  */
 
-import java.net.InetAddress;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.UnavailableException;
 
 public interface IWriteResponseHandler extends IAsyncCallback
 {
     public void get() throws TimeoutException;
-    public void addHintCallback(Message hintedMessage, InetAddress destination);
     public void assureSufficientLiveNodes() throws UnavailableException;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Fri Jan 28 16:27:02 2011
@@ -104,18 +104,18 @@ public class StorageProxy implements Sto
 
         standardWritePerformer = new WritePerformer()
         {
-            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter) throws IOException
+            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
             {
                 assert mutation instanceof RowMutation;
-                sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, responseHandler, localDataCenter, true);
+                sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, responseHandler, localDataCenter, true, consistency_level);
             }
         };
 
         counterWritePerformer = new WritePerformer()
         {
-            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter) throws IOException
+            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
             {
-                applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter);
+                applyCounterMutation(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
             }
         };
     }
@@ -174,7 +174,7 @@ public class StorageProxy implements Sto
                 responseHandler.assureSufficientLiveNodes();
 
                 responseHandlers.add(responseHandler);
-                performer.apply(mutation, hintedEndpoints, responseHandler, localDataCenter);
+                performer.apply(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
             }
             // wait for writes.  throws timeoutexception if necessary
             for (IWriteResponseHandler responseHandler : responseHandlers)
@@ -201,12 +201,11 @@ public class StorageProxy implements Sto
         return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key), table, naturalEndpoints);
     }
 
-    private static void sendToHintedEndpoints(RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages)
+    private static void sendToHintedEndpoints(RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, boolean insertLocalMessages, ConsistencyLevel consistency_level)
     throws IOException
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
         Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
-        Message unhintedMessage = null;
 
         for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
         {
@@ -226,19 +225,15 @@ public class StorageProxy implements Sto
                 else
                 {
                     // belongs on a different server
-                    if (unhintedMessage == null)
-                    {
-                        unhintedMessage = rm.makeRowMutationMessage();
-                        MessagingService.instance().addCallback(responseHandler, unhintedMessage.getMessageId());
-                    }
+                    Message unhintedMessage = rm.makeRowMutationMessage();
                     if (logger.isDebugEnabled())
                         logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + unhintedMessage.getMessageId() + "@" + destination);
 
                     Multimap<Message, InetAddress> messages = dcMessages.get(dc);
                     if (messages == null)
                     {
-                        messages = HashMultimap.create();
-                        dcMessages.put(dc, messages);
+                       messages = HashMultimap.create();
+                       dcMessages.put(dc, messages);
                     }
 
                     messages.put(unhintedMessage, destination);
@@ -257,27 +252,32 @@ public class StorageProxy implements Sto
                             logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + hintedMessage.getMessageId() + "@" + destination + " for " + target);
                     }
                 }
-                responseHandler.addHintCallback(hintedMessage, destination);
+                // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
+                // (non-destination hints are part of the callback and count towards consistency only under CL.ANY)
+                if (targets.contains(destination) || consistency_level == ConsistencyLevel.ANY)
+                    MessagingService.instance().sendRR(hintedMessage, destination, responseHandler);
+                else
+                    MessagingService.instance().sendOneWay(hintedMessage, destination);
 
                 Multimap<Message, InetAddress> messages = dcMessages.get(dc);
 
                 if (messages == null)
                 {
-                    messages = HashMultimap.create();
-                    dcMessages.put(dc, messages);
+                   messages = HashMultimap.create();
+                   dcMessages.put(dc, messages);
                 }
 
                 messages.put(hintedMessage, destination);
             }
-        }
 
-        sendMessages(localDataCenter, dcMessages);
+            sendMessages(localDataCenter, dcMessages, responseHandler);
+        }
     }
 
     /**
      * for each datacenter, send a message to one node to relay the write to other replicas
      */
-    private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages)
+    private static void sendMessages(String localDataCenter, Map<String, Multimap<Message, InetAddress>> dcMessages, IWriteResponseHandler handler)
     throws IOException
     {
         for (Map.Entry<String, Multimap<Message, InetAddress>> entry: dcMessages.entrySet())
@@ -288,15 +288,12 @@ public class StorageProxy implements Sto
             for (Map.Entry<Message, Collection<InetAddress>> messages: entry.getValue().asMap().entrySet())
             {
                 Message message = messages.getKey();
-                // a single message object is used for unhinted writes, so clean out any forwards
-                // from previous loop iterations
-                message.removeHeader(RowMutation.FORWARD_HEADER);
 
                 if (dataCenter.equals(localDataCenter))
                 {
                     // direct writes to local DC
                     for (InetAddress destination : messages.getValue())
-                        MessagingService.instance().sendOneWay(message, destination);
+                        MessagingService.instance().sendRR(message, destination, handler);
                 }
                 else
                 {
@@ -320,7 +317,7 @@ public class StorageProxy implements Sto
                         message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
                     }
                     // send the combined message + forward headers
-                    MessagingService.instance().sendOneWay(message, target);
+                    MessagingService.instance().sendRR(message, target, handler);
                 }
             }
         }
@@ -390,7 +387,7 @@ public class StorageProxy implements Sto
                 }
                 else
                 {
-                    // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica 
+                    // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
                     String table = cm.getTable();
                     AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
                     Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key());
@@ -401,11 +398,10 @@ public class StorageProxy implements Sto
                     IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
                     responseHandlers.add(responseHandler);
 
-                    Message msg = cm.makeMutationMessage();
-                    MessagingService.instance().addCallback(responseHandler, msg.getMessageId());
+                    Message message = cm.makeMutationMessage();
                     if (logger.isDebugEnabled())
-                        logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + msg.getMessageId() + "@" + endpoint);
-                    MessagingService.instance().sendOneWay(msg, endpoint);
+                        logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + message.getMessageId() + "@" + endpoint);
+                    MessagingService.instance().sendRR(message, endpoint, responseHandler);
                 }
             }
             // wait for writes.  throws timeoutexception if necessary
@@ -443,7 +439,7 @@ public class StorageProxy implements Sto
         write(Collections.singletonList(cm), cm.consistency(), counterWritePerformer, false);
     }
 
-    private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter)
+    private static void applyCounterMutation(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level)
     {
         // we apply locally first, then send it to other replica
         if (logger.isDebugEnabled())
@@ -463,15 +459,15 @@ public class StorageProxy implements Sto
 
                 if (cm.shouldReplicateOnWrite())
                 {
-                    // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation) 
+                    // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
                     // and we want to avoid blocking too much the MUTATION stage
                     StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new WrappedRunnable()
-                            {
-                                public void runMayThrow() throws IOException
                     {
-                        // send mutation to other replica
-                        sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, responseHandler, localDataCenter, false);
-                    }
+                        public void runMayThrow() throws IOException
+                        {
+                            // send mutation to other replica
+                            sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, responseHandler, localDataCenter, false, consistency_level);
+                        }
                     });
                 }
             }
@@ -541,7 +537,7 @@ public class StorageProxy implements Sto
             {
                 endpoints = endpoints.subList(0, handler.blockfor);
             }
-            
+
             // The data-request message is sent to dataPoint, the node that will actually get
             // the data for us. The other replicas are only sent a digest query.
             ReadCommand digestCommand = null;
@@ -556,7 +552,7 @@ public class StorageProxy implements Sto
             {
                 if (logger.isDebugEnabled())
                     logger.debug("reading data for " + command + " locally");
-                StageManager.getStage(Stage.READ).submit(new WeakReadLocalRunnable(command, handler));
+                StageManager.getStage(Stage.READ).submit(new LocalReadRunnable(command, handler));
             }
             else
             {
@@ -575,7 +571,7 @@ public class StorageProxy implements Sto
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("reading digest for " + command + " locally");
-                    StageManager.getStage(Stage.READ).submit(new WeakReadLocalRunnable(digestCommand, handler));
+                    StageManager.getStage(Stage.READ).submit(new LocalReadRunnable(digestCommand, handler));
                 }
                 else
                 {
@@ -644,12 +640,13 @@ public class StorageProxy implements Sto
         return rows;
     }
 
-    static class WeakReadLocalRunnable extends WrappedRunnable
+    static class LocalReadRunnable extends WrappedRunnable
     {
         private final ReadCommand command;
         private final ReadCallback<Row> handler;
+        private final long start = System.currentTimeMillis();
 
-        WeakReadLocalRunnable(ReadCommand command, ReadCallback<Row> handler)
+        LocalReadRunnable(ReadCommand command, ReadCallback<Row> handler)
         {
             this.command = command;
             this.handler = handler;
@@ -658,14 +655,15 @@ public class StorageProxy implements Sto
         protected void runMayThrow() throws IOException
         {
             if (logger.isDebugEnabled())
-                logger.debug("weakreadlocal reading " + command);
+                logger.debug("LocalReadRunnable reading " + command);
 
             Table table = Table.open(command.table);
             ReadResponse result = ReadVerbHandler.getResponse(command, command.getRow(table));
+            MessagingService.instance().addLatency(FBUtilities.getLocalAddress(), System.currentTimeMillis() - start);
             handler.response(result);
         }
     }
-    
+
     static <T> ReadCallback<T> getReadCallback(IResponseResolver<T> resolver, String table, ConsistencyLevel consistencyLevel)
     {
         if (consistencyLevel.equals(ConsistencyLevel.LOCAL_QUORUM) || consistencyLevel.equals(ConsistencyLevel.EACH_QUORUM))
@@ -680,8 +678,11 @@ public class StorageProxy implements Sto
     {
         ReadResponseResolver resolver = new ReadResponseResolver(command.table, command.key);
         RepairCallback<Row> handler = new RepairCallback<Row>(resolver, endpoints);
-        Message messageRepair = command.makeReadMessage();
-        MessagingService.instance().sendRR(messageRepair, endpoints, handler);
+        for (InetAddress endpoint : endpoints)
+        {
+            Message messageRepair = command.makeReadMessage();
+            MessagingService.instance().sendRR(messageRepair, endpoint, handler);
+        }
         return handler;
     }
 
@@ -705,28 +706,28 @@ public class StorageProxy implements Sto
             {
                 List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(command.keyspace, range.right);
 
-                if (consistency_level == ConsistencyLevel.ONE && liveEndpoints.contains(FBUtilities.getLocalAddress())) 
+                if (consistency_level == ConsistencyLevel.ONE && liveEndpoints.contains(FBUtilities.getLocalAddress()))
                 {
                     if (logger.isDebugEnabled())
                         logger.debug("local range slice");
                     ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family);
-                    try 
+                    try
                     {
                         rows.addAll(cfs.getRangeSlice(command.super_column,
                                                     range,
                                                     command.max_keys,
                                                     QueryFilter.getFilter(command.predicate, cfs.getComparator())));
-                    } 
-                    catch (ExecutionException e) 
+                    }
+                    catch (ExecutionException e)
                     {
                         throw new RuntimeException(e.getCause());
-                    } 
-                    catch (InterruptedException e) 
+                    }
+                    catch (InterruptedException e)
                     {
                         throw new AssertionError(e);
-                    }           
+                    }
                 }
-                else 
+                else
                 {
                     DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getLocalAddress(), liveEndpoints);
                     RangeSliceCommand c2 = new RangeSliceCommand(command.keyspace, command.column_family, command.super_column, command.predicate, range, command.max_keys);
@@ -737,7 +738,7 @@ public class StorageProxy implements Sto
                     AbstractReplicationStrategy rs = Table.open(command.keyspace).getReplicationStrategy();
                     ReadCallback<List<Row>> handler = getReadCallback(resolver, command.keyspace, consistency_level);
                     // TODO bail early if live endpoints can't satisfy requested consistency level
-                    for (InetAddress endpoint : liveEndpoints) 
+                    for (InetAddress endpoint : liveEndpoints)
                     {
                         MessagingService.instance().sendRR(message, endpoint, handler);
                         if (logger.isDebugEnabled())
@@ -746,23 +747,23 @@ public class StorageProxy implements Sto
                     // TODO read repair on remaining replicas?
 
                     // if we're done, great, otherwise, move to the next range
-                    try 
+                    try
                     {
-                        if (logger.isDebugEnabled()) 
+                        if (logger.isDebugEnabled())
                         {
-                            for (Row row : handler.get()) 
+                            for (Row row : handler.get())
                             {
                                 logger.debug("range slices read " + row.key);
                             }
                         }
                         rows.addAll(handler.get());
-                    } 
-                    catch (DigestMismatchException e) 
+                    }
+                    catch (DigestMismatchException e)
                     {
                         throw new AssertionError(e); // no digests in range slices yet
                     }
                 }
-            
+
                 if (rows.size() >= command.max_keys)
                     break;
             }
@@ -775,7 +776,7 @@ public class StorageProxy implements Sto
     }
 
     /**
-     * initiate a request/response session with each live node to check whether or not everybody is using the same 
+     * initiate a request/response session with each live node to check whether or not everybody is using the same
      * migration id. This is useful for determining if a schema change has propagated through the cluster. Disagreement
      * is assumed if any node fails to respond.
      */
@@ -784,33 +785,38 @@ public class StorageProxy implements Sto
         final String myVersion = DatabaseDescriptor.getDefsVersion().toString();
         final Map<InetAddress, UUID> versions = new ConcurrentHashMap<InetAddress, UUID>();
         final Set<InetAddress> liveHosts = Gossiper.instance.getLiveMembers();
-        final Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
         final CountDownLatch latch = new CountDownLatch(liveHosts.size());
-        // an empty message acts as a request to the SchemaCheckVerbHandler.
-        MessagingService.instance().sendRR(msg, liveHosts, new IAsyncCallback()
+
+        IAsyncCallback cb = new IAsyncCallback()
         {
-            public void response(Message msg)
+            public void response(Message message)
             {
                 // record the response from the remote node.
-                logger.debug("Received schema check response from " + msg.getFrom().getHostAddress());
-                UUID theirVersion = UUID.fromString(new String(msg.getMessageBody()));
-                versions.put(msg.getFrom(), theirVersion);
+                logger.debug("Received schema check response from " + message.getFrom().getHostAddress());
+                UUID theirVersion = UUID.fromString(new String(message.getMessageBody()));
+                versions.put(message.getFrom(), theirVersion);
                 latch.countDown();
             }
-        });
-        
+        };
+        // an empty message acts as a request to the SchemaCheckVerbHandler.
+        for (InetAddress endpoint : liveHosts)
+        {
+            Message message = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.SCHEMA_CHECK, ArrayUtils.EMPTY_BYTE_ARRAY);
+            MessagingService.instance().sendRR(message, endpoint, cb);
+        }
+
         try
         {
             // wait for as long as possible. timeout-1s if possible.
             latch.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-        } 
-        catch (InterruptedException ex) 
+        }
+        catch (InterruptedException ex)
         {
             throw new AssertionError("This latch shouldn't have been interrupted.");
         }
-        
+
         logger.debug("My version is " + myVersion);
-        
+
         // maps versions to hosts that are on that version.
         Map<String, List<String>> results = new HashMap<String, List<String>>();
         Iterable<InetAddress> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(), Gossiper.instance.getUnreachableMembers());
@@ -836,10 +842,10 @@ public class StorageProxy implements Sto
             for (String host : entry.getValue())
                 logger.debug("%s disagrees (%s)", host, entry.getKey());
         }
-        
+
         if (results.size() == 1)
             logger.debug("Schemas are in agreement.");
-        
+
         return results;
     }
 
@@ -881,7 +887,7 @@ public class StorageProxy implements Sto
 
         return ranges;
     }
-    
+
     private static boolean randomlyReadRepair(ReadCommand command)
     {
         CFMetaData cfmd = DatabaseDescriptor.getTableMetaData(command.table).get(command.getColumnFamilyName());
@@ -1007,11 +1013,11 @@ public class StorageProxy implements Sto
             // collect replies and resolve according to consistency level
             RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(keyspace, liveEndpoints);
             ReadCallback<List<Row>> handler = getReadCallback(resolver, keyspace, consistency_level);
-            
+
             // bail early if live endpoints can't satisfy requested consistency level
             if(handler.blockfor > liveEndpoints.size())
                 throw new UnavailableException();
-            
+
             IndexScanCommand command = new IndexScanCommand(keyspace, column_family, index_clause, column_predicate, range);
             Message message = command.getMessage();
             for (InetAddress endpoint : liveEndpoints)
@@ -1101,8 +1107,11 @@ public class StorageProxy implements Sto
         // Send out the truncate calls and track the responses with the callbacks.
         logger.debug("Starting to send truncate messages to hosts {}", allEndpoints);
         Truncation truncation = new Truncation(keyspace, cfname);
-        Message message = truncation.makeTruncationMessage();
-        MessagingService.instance().sendRR(message, allEndpoints, responseHandler);
+        for (InetAddress endpoint : allEndpoints)
+        {
+            Message message = truncation.makeTruncationMessage();
+            MessagingService.instance().sendRR(message, endpoint, responseHandler);
+        }
 
         // Wait for all
         logger.debug("Sent all truncate messages, now waiting for {} responses", blockFor);
@@ -1157,6 +1166,6 @@ public class StorageProxy implements Sto
 
     private interface WritePerformer
     {
-        public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter) throws IOException;
+        public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException;
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ByteBufferUtil.java Fri Jan 28 16:27:02 2011
@@ -135,7 +135,12 @@ public class ByteBufferUtil
     public static byte[] getArray(ByteBuffer b, int start, int length)
     {
         if (b.hasArray())
-            return Arrays.copyOfRange(b.array(), start + b.arrayOffset(), start + length + b.arrayOffset());
+        {
+            if (b.arrayOffset() == 0 && start == 0 && length == b.array().length)
+                return b.array();
+            else
+                return Arrays.copyOfRange(b.array(), start + b.arrayOffset(), start + length + b.arrayOffset());
+        }
 
         byte[] bytes = new byte[length];
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1064763&r1=1064762&r2=1064763&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Fri Jan 28 16:27:02 2011
@@ -18,10 +18,7 @@
 
 package org.apache.cassandra.utils;
 
-import java.util.Enumeration;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.*;
 import java.util.concurrent.Callable;
 
 import com.google.common.base.Function;
@@ -33,7 +30,7 @@ import org.cliffc.high_scale_lib.NonBloc
 public class ExpiringMap<K, V>
 {
     private static final Logger logger = LoggerFactory.getLogger(ExpiringMap.class);
-    private final Function<K, ?> postExpireHook;
+    private final Function<Pair<K,V>, ?> postExpireHook;
 
     private static class CacheableObject<T>
     {
@@ -69,18 +66,12 @@ public class ExpiringMap<K, V>
         @Override
         public void run()
         {
-            synchronized (cache)
+            for (Map.Entry<K, CacheableObject> entry : cache.entrySet())
             {
-                Enumeration<K> e = cache.keys();
-                while (e.hasMoreElements())
+                if (entry.getValue().isReadyToDie(expiration))
                 {
-                    K key = e.nextElement();
-                    CacheableObject co = cache.get(key);
-                    if (co != null && co.isReadyToDie(expiration))
-                    {
-                        cache.remove(key);
-                        postExpireHook.apply(key);
-                    }
+                    cache.remove(entry.getKey());
+                    postExpireHook.apply(new Pair(entry.getKey(), entry.getValue().getValue()));
                 }
             }
         }
@@ -99,7 +90,7 @@ public class ExpiringMap<K, V>
      *
      * @param expiration the TTL for objects in the cache in milliseconds
      */
-    public ExpiringMap(long expiration, Function<K, ?> postExpireHook)
+    public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
     {
         this.postExpireHook = postExpireHook;
         if (expiration <= 0)



Mime
View raw message