cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1163760 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/concurrent/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/locator/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apa...
Date Wed, 31 Aug 2011 19:33:04 GMT
Author: jbellis
Date: Wed Aug 31 19:33:03 2011
New Revision: 1163760

URL: http://svn.apache.org/viewvc?rev=1163760&view=rev
Log:
generate hints for replicas that timeout
patch by Patricio Echague and jbellis for CASSANDRA-2034

Added:
    cassandra/trunk/src/java/org/apache/cassandra/concurrent/CreationTimeAwareFuture.java
    cassandra/trunk/src/java/org/apache/cassandra/net/CallbackInfo.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
    cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed Aug 31 19:33:03 2011
@@ -51,6 +51,8 @@
  * Fix streaming over SSL when compressed SSTable involved (CASSANDRA-3051)
  * Add support for pluggable secondary index implementations (CASSANDRA-3078)
  * remove compaction_thread_priority setting (CASSANDRA-3104)
+ * generate hints for replicas that timeout, not just replicas that are known
+   to be down before starting (CASSANDRA-2034)
 
 
 0.8.5

Added: cassandra/trunk/src/java/org/apache/cassandra/concurrent/CreationTimeAwareFuture.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/CreationTimeAwareFuture.java?rev=1163760&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/CreationTimeAwareFuture.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/CreationTimeAwareFuture.java Wed Aug 31 19:33:03 2011
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Decorates {@link FutureTask}
+ * </p>
+ * This Future implementation makes the future.get(long timeout, TimeUnit unit)
+ * wait the amount of time specified in the .get(...) call based on the object creation
+ * by keeping an internal timestamp of when this object was constructed
+ *
+ * @param <V>
+ */
+public class CreationTimeAwareFuture<V> implements Future<V> 
+{
+
+    private long creationTime = System.currentTimeMillis();
+
+    private Future<V> future;
+
+    public CreationTimeAwareFuture(Future<V> future) 
+    {
+        this.future = future;
+        creationTime = System.currentTimeMillis();
+    }
+
+    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 
+    {
+        timeout = unit.toMillis(timeout);
+        long overallTimeout = timeout - (System.currentTimeMillis() - creationTime);
+        return future.get(overallTimeout, TimeUnit.MILLISECONDS);
+    }
+
+    public boolean cancel(boolean mayInterruptIfRunning) 
+    {
+        return future.cancel(mayInterruptIfRunning);
+    }
+
+    public boolean isCancelled()
+    {
+        return future.isCancelled();
+    }
+
+    public boolean isDone()
+    {
+        return future.isDone();
+    }
+
+    public V get() throws InterruptedException, ExecutionException
+    {
+       return future.get();
+    }
+
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutation.java Wed Aug 31 19:33:03 2011
@@ -44,7 +44,6 @@ import org.apache.cassandra.utils.UUIDGe
 public class RowMutation implements IMutation, MessageProducer
 {
     private static RowMutationSerializer serializer_ = new RowMutationSerializer();
-    public static final String HINT = "HINT";
     public static final String FORWARD_HEADER = "FORWARD";
 
     public static RowMutationSerializer serializer()
@@ -354,7 +353,7 @@ public class RowMutation implements IMut
         }
     }
 
-    static RowMutation fromBytes(byte[] raw, int version) throws IOException
+    public static RowMutation fromBytes(byte[] raw, int version) throws IOException
     {
         RowMutation rm = serializer_.deserialize(new DataInputStream(new FastByteArrayInputStream(raw)), version);
         boolean hasCounters = false;

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java Wed Aug 31 19:33:03 2011
@@ -47,22 +47,6 @@ public class RowMutationVerbHandler impl
             if (logger_.isDebugEnabled())
               logger_.debug("Applying " + rm);
 
-            /* Check if there were any hints in this message */
-            byte[] hintedBytes = message.getHeader(RowMutation.HINT);
-            if (hintedBytes != null)
-            {
-                assert hintedBytes.length > 0;
-                DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(hintedBytes));
-                while (dis.available() > 0)
-                {
-                    ByteBuffer addressBytes = ByteBufferUtil.readWithShortLength(dis);
-                    if (logger_.isDebugEnabled())
-                        logger_.debug("Adding hint for " + InetAddress.getByName(ByteBufferUtil.string(addressBytes)));
-                    RowMutation hintedMutation = RowMutation.hintFor(rm, addressBytes);
-                    hintedMutation.apply();
-                }
-            }
-        
             // Check if there were any forwarding headers in this message
             byte[] forwardBytes = message.getHeader(RowMutation.FORWARD_HEADER);
             if (forwardBytes != null)

Modified: cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java Wed Aug 31 19:33:03 2011
@@ -25,15 +25,16 @@ import java.util.*;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
-import org.apache.cassandra.gms.Gossiper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
+import org.apache.cassandra.service.DatacenterWriteResponseHandler;
+import org.apache.cassandra.service.IWriteResponseHandler;
+import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.FBUtilities;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -113,20 +114,18 @@ public abstract class AbstractReplicatio
      */
     public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
 
-    public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints,
-                                                         Multimap<InetAddress, InetAddress> hintedEndpoints,
-                                                         ConsistencyLevel consistency_level)
+    public IWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistency_level)
     {
         if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
         {
             // block for in this context will be localnodes block.
-            return DatacenterWriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistency_level, table);
+            return DatacenterWriteResponseHandler.create(writeEndpoints, consistency_level, table);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
         {
-            return DatacenterSyncWriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistency_level, table);
+            return DatacenterSyncWriteResponseHandler.create(writeEndpoints, consistency_level, table);
         }
-        return WriteResponseHandler.create(writeEndpoints, hintedEndpoints, consistency_level, table);
+        return WriteResponseHandler.create(writeEndpoints, consistency_level, table);
     }
 
     /**
@@ -137,49 +136,6 @@ public abstract class AbstractReplicatio
      */
     public abstract int getReplicationFactor();
 
-    /**
-     * returns <tt>Multimap</tt> of {live destination: ultimate targets}, where if target is not the same
-     * as the destination, it is a "hinted" write, and will need to be sent to
-     * the ultimate target when it becomes alive again.
-     */
-    public Multimap<InetAddress, InetAddress> getHintedEndpoints(Collection<InetAddress> targets)
-    {
-        Multimap<InetAddress, InetAddress> map = HashMultimap.create(targets.size(), 1);
-
-        // first, add the live endpoints
-        for (InetAddress ep : targets)
-        {
-            if (FailureDetector.instance.isAlive(ep))
-                map.put(ep, ep);
-        }
-
-        // if everything was alive or we're not doing HH on this keyspace, stop with just the live nodes
-        if (map.size() == targets.size() || !StorageProxy.isHintedHandoffEnabled())
-            return map;
-
-        // Assign dead endpoints to be hinted to the local node.
-        //
-        // we do a 2nd pass on targets instead of using temporary storage,
-        // to optimize for the common case (everything was alive).
-        InetAddress localAddress = FBUtilities.getBroadcastAddress();
-        for (InetAddress ep : targets)
-        {
-            if (map.containsKey(ep))
-                continue;
-            if (!StorageProxy.shouldHint(ep))
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("not hinting " + ep + " which has been down " + Gossiper.instance.getEndpointDowntime(ep) + "ms");
-                continue;
-            }
-
-            // We always store the hint on the coordinator node.
-            map.put(localAddress, ep);
-        }
-
-        return map;
-    }
-
     /*
      * NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below.
      * this is fine as long as we don't use this on any critical path.

Added: cassandra/trunk/src/java/org/apache/cassandra/net/CallbackInfo.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/CallbackInfo.java?rev=1163760&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/CallbackInfo.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/CallbackInfo.java Wed Aug 31 19:33:03 2011
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.net;
+
+import java.net.InetAddress;
+
+import org.apache.cassandra.service.StorageProxy;
+
+/**
+ * Encapsulates the callback information.
+ * The ability to set the message is useful in cases for when a hint needs 
+ * to be written due to a timeout in the response from a replica.
+ */
+class CallbackInfo
+{
+    protected final InetAddress target;
+    protected final IMessageCallback callback;
+    protected final Message message;
+
+    public CallbackInfo(InetAddress target, IMessageCallback callback)
+    {
+        this.target = target;
+        this.callback = callback;
+        this.message = null;
+    }
+
+    public CallbackInfo(InetAddress target, IMessageCallback callback, Message message)
+    {
+        this.target = target;
+        this.callback = callback;
+        this.message = message;
+    }
+
+    /**
+     * @return TRUE if a hint should be written for this target and if the CL was achieved. FALSE otherwise.
+     *
+     * NOTE:
+     * Assumes it is only called after the write of "message" to "target" has timed out.
+     */
+    public boolean shouldHint()
+    {
+        return message != null && StorageProxy.shouldHint(target);
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Wed Aug 31 19:33:03 2011
@@ -27,6 +27,7 @@ import java.nio.channels.AsynchronousClo
 import java.nio.channels.ServerSocketChannel;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -38,22 +39,26 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.ILatencySubscriber;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.security.SSLFactory;
+import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.FileStreamTask;
 import org.apache.cassandra.streaming.StreamHeader;
 import org.apache.cassandra.utils.*;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
+
 public final class MessagingService implements MessagingServiceMBean
 {
     public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService";
@@ -68,7 +73,7 @@ public final class MessagingService impl
     static final int PROTOCOL_MAGIC = 0xCA552DFA;
 
     /* This records all the results mapped by message Id */
-    private final ExpiringMap<String, Pair<InetAddress, IMessageCallback>> callbacks;
+    private final ExpiringMap<String, CallbackInfo> callbacks;
 
     /* Lookup table for registering message handlers based on the verb. */
     private final Map<StorageService.Verb, IVerbHandler> verbHandlers_;
@@ -107,7 +112,7 @@ public final class MessagingService impl
     private final Map<String, AtomicLong> timeoutsPerHost = new HashMap<String, AtomicLong>();
     private final Map<String, AtomicLong> recentTimeoutsPerHost = new HashMap<String, AtomicLong>();
     private final List<ILatencySubscriber> subscribers = new ArrayList<ILatencySubscriber>();
-    private static final long DEFAULT_CALLBACK_TIMEOUT = (long) (1.1 * DatabaseDescriptor.getRpcTimeout());
+    private static final long DEFAULT_CALLBACK_TIMEOUT = DatabaseDescriptor.getRpcTimeout();
 
     private static class MSHandle
     {
@@ -139,14 +144,14 @@ public final class MessagingService impl
         };
         StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS, LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
-        Function<Pair<String, Pair<InetAddress, IMessageCallback>>, ?> timeoutReporter = new Function<Pair<String, Pair<InetAddress, IMessageCallback>>, Object>()
+        Function<Pair<String, CallbackInfo>, ?> timeoutReporter = new Function<Pair<String, CallbackInfo>, Object>()
         {
-            public Object apply(Pair<String, Pair<InetAddress, IMessageCallback>> pair)
+            public Object apply(Pair<String, CallbackInfo> pair)
             {
-                Pair<InetAddress, IMessageCallback> expiredValue = pair.right;
-                maybeAddLatency(expiredValue.right, expiredValue.left, (double) DatabaseDescriptor.getRpcTimeout());
+                CallbackInfo expiredCallbackInfo = pair.right;
+                maybeAddLatency(expiredCallbackInfo.callback, expiredCallbackInfo.target, (double) DatabaseDescriptor.getRpcTimeout());
                 totalTimeouts++;
-                String ip = expiredValue.left.getHostAddress();
+                String ip = expiredCallbackInfo.target.getHostAddress();
                 AtomicLong c = timeoutsPerHost.get(ip);
                 if (c == null)
                     c = timeoutsPerHost.put(ip, new AtomicLong());
@@ -156,10 +161,18 @@ public final class MessagingService impl
                 if (recentTimeoutsPerHost.get(ip) == null)
                     recentTimeoutsPerHost.put(ip, new AtomicLong());
 
+                if (expiredCallbackInfo.shouldHint())
+                {
+                    // Trigger hints for expired mutation message.
+                    assert expiredCallbackInfo.message != null;
+                    scheduleMutationHint(expiredCallbackInfo.message, expiredCallbackInfo.target);
+                }
+
                 return null;
             }
         };
-        callbacks = new ExpiringMap<String, Pair<InetAddress, IMessageCallback>>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter);
+
+        callbacks = new ExpiringMap<String, CallbackInfo>(DEFAULT_CALLBACK_TIMEOUT, timeoutReporter);
 
         MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
         try
@@ -172,6 +185,21 @@ public final class MessagingService impl
         }
     }
 
+
+    private Future<?> scheduleMutationHint(Message mutationMessage, InetAddress mutationTarget)
+    {
+        try
+        {
+            RowMutation rm = RowMutation.fromBytes(mutationMessage.getMessageBody(), mutationMessage.getVersion());
+            return StorageProxy.scheduleLocalHint(rm, mutationTarget, null, null);
+        }
+        catch (IOException e)
+        {
+            logger_.error("Unable to deserialize mutation when writting hint for: " + mutationTarget);
+        }
+        return null;
+    }
+
     /**
      * Track latency information for the dynamic snitch
      * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in
@@ -292,17 +320,19 @@ public final class MessagingService impl
         return verbHandlers_.get(type);
     }
 
-    private void addCallback(IMessageCallback cb, String messageId, InetAddress to)
+    private void addCallback(IMessageCallback cb, String messageId, Message message, InetAddress to, long timeout)
     {
-        addCallback(cb, messageId, to, DEFAULT_CALLBACK_TIMEOUT);
-    }
+        CallbackInfo previous;
+
+        // If HH is enabled and this is a mutation message => store the message to track for potential hints.
+        if (DatabaseDescriptor.hintedHandoffEnabled() && message.getVerb() == StorageService.Verb.MUTATION)
+            previous = callbacks.put(messageId, new CallbackInfo(to, cb, message), timeout);
+        else
+            previous = callbacks.put(messageId, new CallbackInfo(to, cb), timeout);
 
-    private void addCallback(IMessageCallback cb, String messageId, InetAddress to, long timeout)
-    {
-        Pair<InetAddress, IMessageCallback> previous = callbacks.put(messageId, new Pair<InetAddress, IMessageCallback>(to, cb), timeout);
         assert previous == null;
     }
-    
+
     private static AtomicInteger idGen = new AtomicInteger(0);
     // TODO make these integers to avoid unnecessary int -> string -> int conversions
     private static String nextId()
@@ -321,6 +351,8 @@ public final class MessagingService impl
     /**
      * Send a message to a given endpoint. This method specifies a callback
      * which is invoked with the actual response.
+     * Also holds the message (only mutation messages) to determine if it
+     * needs to trigger a hint (uses StorageProxy for that).
      * @param message message to be sent.
      * @param to endpoint to which the message needs to be sent
      * @param cb callback interface which is used to pass the responses or
@@ -332,7 +364,7 @@ public final class MessagingService impl
     public String sendRR(Message message, InetAddress to, IMessageCallback cb, long timeout)
     {
         String id = nextId();
-        addCallback(cb, id, to, timeout);
+        addCallback(cb, id, message, to, timeout);
         sendOneWay(message, id, to);
         return id;
     }
@@ -424,16 +456,21 @@ public final class MessagingService impl
         subscribers.add(subcriber);
     }
 
-    /** blocks until the processing pools are empty and done. */
-    public void waitFor() throws InterruptedException
+    public void waitForStreaming() throws InterruptedException
     {
-        while (!streamExecutor_.isTerminated())
-            streamExecutor_.awaitTermination(5, TimeUnit.SECONDS);
+        streamExecutor_.awaitTermination(24, TimeUnit.HOURS);
+    }
+
+    public void clearCallbacksUnsafe()
+    {
+        callbacks.clear();
     }
 
     public void shutdown()
     {
         logger_.info("Shutting down MessageService...");
+        // We may need to schedule hints on the mutation stage, so it's erroneous to shut down the mutation stage first
+        assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
         try
         {
@@ -444,10 +481,10 @@ public final class MessagingService impl
             throw new IOError(e);
         }
 
-        streamExecutor_.shutdownNow();
-        callbacks.shutdown();
+        streamExecutor_.shutdown();
 
-        logger_.info("Shutdown complete (no further commands will be processed)");
+        logger_.info("Waiting for in-progress requests to complete");
+        callbacks.shutdown();
     }
 
     public void receive(Message message, String id)
@@ -466,7 +503,7 @@ public final class MessagingService impl
         stage.execute(runnable);
     }
 
-    public Pair<InetAddress, IMessageCallback> removeRegisteredCallback(String messageId)
+    public CallbackInfo removeRegisteredCallback(String messageId)
     {
         return callbacks.remove(messageId);
     }
@@ -695,4 +732,5 @@ public final class MessagingService impl
         }
         return result;
     }
+
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/ResponseVerbHandler.java Wed Aug 31 19:33:03 2011
@@ -18,13 +18,9 @@
 
 package org.apache.cassandra.net;
 
-import java.net.InetAddress;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.utils.Pair;
-
 public class ResponseVerbHandler implements IVerbHandler
 {
     private static final Logger logger_ = LoggerFactory.getLogger( ResponseVerbHandler.class );
@@ -32,14 +28,14 @@ public class ResponseVerbHandler impleme
     public void doVerb(Message message, String id)
     {     
         double age = System.currentTimeMillis() - MessagingService.instance().getRegisteredCallbackAge(id);
-        Pair<InetAddress, IMessageCallback> pair = MessagingService.instance().removeRegisteredCallback(id);
-        if (pair == null)
+        CallbackInfo callbackInfo = MessagingService.instance().removeRegisteredCallback(id);
+        if (callbackInfo == null)
         {
             logger_.debug("Callback already removed for {}", id);
             return;
         }
 
-        IMessageCallback cb = pair.right;
+        IMessageCallback cb = callbackInfo.callback;
         MessagingService.instance().maybeAddLatency(cb, message.getFrom(), age);
 
         if (cb instanceof IAsyncCallback)

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java Wed Aug 31 19:33:03 2011
@@ -22,17 +22,19 @@ package org.apache.cassandra.service;
 
 
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import com.google.common.collect.Multimap;
-
+import org.apache.cassandra.concurrent.CreationTimeAwareFuture;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SimpleCondition;
 
 public abstract class AbstractWriteResponseHandler implements IWriteResponseHandler
@@ -40,19 +42,21 @@ public abstract class AbstractWriteRespo
     protected final SimpleCondition condition = new SimpleCondition();
     protected final long startTime;
     protected final Collection<InetAddress> writeEndpoints;
-    protected final Multimap<InetAddress, InetAddress> hintedEndpoints;
     protected final ConsistencyLevel consistencyLevel;
+    protected List<CreationTimeAwareFuture<?>> hintFutures;
 
-    protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel)
+    protected AbstractWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel)
     {
         startTime = System.currentTimeMillis();
         this.consistencyLevel = consistencyLevel;
-        this.hintedEndpoints = hintedEndpoints;
         this.writeEndpoints = writeEndpoints;
     }
 
     public void get() throws TimeoutException
     {
+        if (hintFutures != null)
+            waitForHints(hintFutures);
+
         long timeout = DatabaseDescriptor.getRpcTimeout() - (System.currentTimeMillis() - startTime);
         boolean success;
         try
@@ -70,6 +74,29 @@ public abstract class AbstractWriteRespo
         }
     }
 
+    public void addFutureForHint(CreationTimeAwareFuture<?> hintFuture)
+    {
+        if (hintFutures == null)
+            hintFutures = new ArrayList<CreationTimeAwareFuture<?>>(writeEndpoints.size());
+        hintFutures.add(hintFuture);
+    }
+
+    protected static void waitForHints(List<CreationTimeAwareFuture<?>> hintFutures) throws TimeoutException
+    {
+        // Wait for hints
+        try
+        {
+            FBUtilities.waitOnFutures(hintFutures, DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+        } 
+        catch (RuntimeException e)
+        {
+            // ExecutionEx needs a special treatment. We need to inform the client to back off because this node is overwhelmed.
+            if (e.getCause() != null && e.getCause() instanceof ExecutionException)
+                throw new TimeoutException();
+            throw e;
+        }
+    }
+
     /** null message means "response from local write" */
     public abstract void response(Message msg);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java Wed Aug 31 19:33:03 2011
@@ -30,10 +30,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.Multimap;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
@@ -57,10 +56,10 @@ public class DatacenterSyncWriteResponse
 	private final NetworkTopologyStrategy strategy;
     private HashMap<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
 
-    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(writeEndpoints, hintedEndpoints, consistencyLevel);
+        super(writeEndpoints, consistencyLevel);
         assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
 
         strategy = (NetworkTopologyStrategy) Table.open(table).getReplicationStrategy();
@@ -72,9 +71,9 @@ public class DatacenterSyncWriteResponse
         }
     }
 
-    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
-        return new DatacenterSyncWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        return new DatacenterSyncWriteResponseHandler(writeEndpoints, consistencyLevel, table);
     }
 
     public void response(Message message)
@@ -96,13 +95,14 @@ public class DatacenterSyncWriteResponse
     }
 
     public void assureSufficientLiveNodes() throws UnavailableException
-    {   
-		Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>();
+    {
+        Map<String, AtomicInteger> dcEndpoints = new HashMap<String, AtomicInteger>();
         for (String dc: strategy.getDatacenters())
             dcEndpoints.put(dc, new AtomicInteger());
-        for (InetAddress destination : hintedEndpoints.keySet())
+
+        for (InetAddress destination : writeEndpoints)
         {
-            if (writeEndpoints.contains(destination))
+            if (FailureDetector.instance.isAlive(destination))
             {
                 // figure out the destination dc
                 String destinationDC = snitch.getDatacenter(destination);

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java Wed Aug 31 19:33:03 2011
@@ -27,10 +27,9 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.Collection;
 
-import com.google.common.collect.Multimap;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.Message;
@@ -51,15 +50,15 @@ public class DatacenterWriteResponseHand
         localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
     }
 
-    protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected DatacenterWriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
-        super(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        super(writeEndpoints, consistencyLevel, table);
         assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
     }
 
-    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
-        return new DatacenterWriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        return new DatacenterWriteResponseHandler(writeEndpoints, consistencyLevel, table);
     }
 
     @Override
@@ -84,9 +83,9 @@ public class DatacenterWriteResponseHand
     public void assureSufficientLiveNodes() throws UnavailableException
     {
         int liveNodes = 0;
-        for (InetAddress destination : hintedEndpoints.keySet())
+        for (InetAddress destination : writeEndpoints)
         {
-            if (localdc.equals(snitch.getDatacenter(destination)) && writeEndpoints.contains(destination))
+            if (localdc.equals(snitch.getDatacenter(destination)) && FailureDetector.instance.isAlive(destination))
                 liveNodes++;
         }
 
@@ -95,4 +94,5 @@ public class DatacenterWriteResponseHand
             throw new UnavailableException();
         }
     }
+
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/IWriteResponseHandler.java Wed Aug 31 19:33:03 2011
@@ -22,11 +22,13 @@ package org.apache.cassandra.service;
 
 import java.util.concurrent.TimeoutException;
 
+import org.apache.cassandra.concurrent.CreationTimeAwareFuture;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.thrift.UnavailableException;
 
 public interface IWriteResponseHandler extends IAsyncCallback
 {
     public void get() throws TimeoutException;
+    public void addFutureForHint(CreationTimeAwareFuture<?> hintFuture);
     public void assureSufficientLiveNodes() throws UnavailableException;
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Wed Aug 31 19:33:03 2011
@@ -18,12 +18,15 @@
 
 package org.apache.cassandra.service;
 
-import java.io.*;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
@@ -33,12 +36,12 @@ import com.google.common.collect.Multima
 
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.net.*;
-
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.CreationTimeAwareFuture;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -48,16 +51,18 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
-import org.apache.cassandra.thrift.IndexClause;
-import org.apache.cassandra.thrift.InvalidRequestException;
-import org.apache.cassandra.thrift.SlicePredicate;
-import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.LatencyTracker;
+import org.apache.cassandra.utils.Pair;
+
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -67,8 +72,7 @@ public class StorageProxy implements Sto
     private static final LatencyTracker readStats = new LatencyTracker();
     private static final LatencyTracker rangeStats = new LatencyTracker();
     private static final LatencyTracker writeStats = new LatencyTracker();
-    private static boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
-    private static int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
+
     public static final String UNREACHABLE = "UNREACHABLE";
 
     private static final WritePerformer standardWritePerformer;
@@ -77,6 +81,12 @@ public class StorageProxy implements Sto
 
     public static final StorageProxy instance = new StorageProxy();
 
+    private static volatile boolean hintedHandoffEnabled = DatabaseDescriptor.hintedHandoffEnabled();
+    private static volatile int maxHintWindow = DatabaseDescriptor.getMaxHintWindow();
+    private static volatile int maxHintsInProgress = 1024 * Runtime.getRuntime().availableProcessors();
+    private static final AtomicInteger hintsInProgress = new AtomicInteger();
+    private static final AtomicLong totalHints = new AtomicLong();
+
     private StorageProxy() {}
 
     static
@@ -93,10 +103,15 @@ public class StorageProxy implements Sto
 
         standardWritePerformer = new WritePerformer()
         {
-            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
+            public void apply(IMutation mutation,
+                              Collection<InetAddress> targets,
+                              IWriteResponseHandler responseHandler,
+                              String localDataCenter,
+                              ConsistencyLevel consistency_level)
+            throws IOException, TimeoutException
             {
                 assert mutation instanceof RowMutation;
-                sendToHintedEndpoints((RowMutation) mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+                sendToHintedEndpoints((RowMutation) mutation, targets, responseHandler, localDataCenter, consistency_level);
             }
         };
 
@@ -108,24 +123,34 @@ public class StorageProxy implements Sto
          */
         counterWritePerformer = new WritePerformer()
         {
-            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
+            public void apply(IMutation mutation,
+                              Collection<InetAddress> targets,
+                              IWriteResponseHandler responseHandler,
+                              String localDataCenter,
+                              ConsistencyLevel consistency_level) 
+            throws IOException
             {
                 if (logger.isDebugEnabled())
                     logger.debug("insert writing local & replicate " + mutation.toString(true));
 
-                Runnable runnable = counterWriteTask(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+                Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                 runnable.run();
             }
         };
 
         counterWriteOnCoordinatorPerformer = new WritePerformer()
         {
-            public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException
+            public void apply(IMutation mutation,
+                              Collection<InetAddress> targets,
+                              IWriteResponseHandler responseHandler,
+                              String localDataCenter,
+                              ConsistencyLevel consistency_level)
+            throws IOException
             {
                 if (logger.isDebugEnabled())
                     logger.debug("insert writing local & replicate " + mutation.toString(true));
 
-                Runnable runnable = counterWriteTask(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+                Runnable runnable = counterWriteTask(mutation, targets, responseHandler, localDataCenter, consistency_level);
                 StageManager.getStage(Stage.MUTATION).execute(runnable);
             }
         };
@@ -139,7 +164,7 @@ public class StorageProxy implements Sto
      *
      * @param mutations the mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
-    */
+     */
     public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, TimeoutException
     {
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
@@ -162,11 +187,13 @@ public class StorageProxy implements Sto
                     responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer));
                 }
             }
-            // wait for writes.  throws timeoutexception if necessary
+
+            // wait for writes.  throws TimeoutException if necessary
             for (IWriteResponseHandler responseHandler : responseHandlers)
             {
                 responseHandler.get();
             }
+
         }
         catch (TimeoutException ex)
         {
@@ -202,20 +229,23 @@ public class StorageProxy implements Sto
      * given the list of write endpoints (either standardWritePerformer for
      * standard writes or counterWritePerformer for counter writes).
      */
-    public static IWriteResponseHandler performWrite(IMutation mutation, ConsistencyLevel consistency_level, String localDataCenter, WritePerformer performer) throws UnavailableException, TimeoutException, IOException
+    public static IWriteResponseHandler performWrite(IMutation mutation,
+                                                     ConsistencyLevel consistency_level,
+                                                     String localDataCenter,
+                                                     WritePerformer performer)
+    throws UnavailableException, TimeoutException, IOException
     {
         String table = mutation.getTable();
         AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
 
         Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, mutation.key());
-        Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
 
-        IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, consistency_level);
+        IWriteResponseHandler responseHandler = rs.getWriteResponseHandler(writeEndpoints, consistency_level);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
 
-        performer.apply(mutation, hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+        performer.apply(mutation, writeEndpoints, responseHandler, localDataCenter, consistency_level);
         return responseHandler;
     }
 
@@ -226,23 +256,37 @@ public class StorageProxy implements Sto
         return ss.getTokenMetadata().getWriteEndpoints(StorageService.getPartitioner().getToken(key), table, naturalEndpoints);
     }
 
-    private static void sendToHintedEndpoints(final RowMutation rm, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
-    throws IOException
+    /**
+     * Send the mutations to the right targets, write it locally if it corresponds or writes a hint when the node
+     * is not available.
+     *
+     * Note about hints:
+     *
+     * | Hinted Handoff | Consist. Level |
+     * | on             |       >=1      | --> wait for hints. We DO NOT notify the handler with handler.response() for hints; 
+     * | on             |       ANY      | --> wait for hints. Responses count towards consistency.
+     * | off            |       >=1      | --> DO NOT fire hints. And DO NOT wait for them to complete.
+     * | off            |       ANY      | --> DO NOT fire hints. And DO NOT wait for them to complete.
+     *
+     * @throws TimeoutException if the hints cannot be written/enqueued 
+     */
+    private static void sendToHintedEndpoints(final RowMutation rm, 
+                                              Collection<InetAddress> targets,
+                                              IWriteResponseHandler responseHandler,
+                                              String localDataCenter,
+                                              ConsistencyLevel consistency_level)
+    throws IOException, TimeoutException
     {
         // Multimap that holds onto all the messages and addresses meant for a specific datacenter
-        Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(hintedEndpoints.size());
+        Map<String, Multimap<Message, InetAddress>> dcMessages = new HashMap<String, Multimap<Message, InetAddress>>(targets.size());
         MessageProducer producer = new CachingMessageProducer(rm);
 
-        for (Map.Entry<InetAddress, Collection<InetAddress>> entry : hintedEndpoints.asMap().entrySet())
+        for (InetAddress destination : targets)
         {
-            InetAddress destination = entry.getKey();
-            Collection<InetAddress> targets = entry.getValue();
-
-            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
-
-            if (targets.size() == 1 && targets.iterator().next().equals(destination))
+            if (FailureDetector.instance.isAlive(destination))
             {
-                // unhinted writes
+                String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination);
+
                 if (destination.equals(FBUtilities.getBroadcastAddress()))
                 {
                     insertLocal(rm, responseHandler);
@@ -265,28 +309,67 @@ public class StorageProxy implements Sto
             }
             else
             {
-                // hinted messages are unique, so there is no point to adding a hop by forwarding via another node.
-                // thus, we use sendRR/sendOneWay directly here.
-                Message hintedMessage = rm.getMessage(Gossiper.instance.getVersion(destination));
-                for (InetAddress target : targets)
-                {
-                    if (!target.equals(destination))
-                    {
-                        addHintHeader(hintedMessage, target);
-                        if (logger.isDebugEnabled())
-                            logger.debug("insert writing key " + ByteBufferUtil.bytesToHex(rm.key()) + " to " + destination + " for " + target);
-                    }
-                }
-                // non-destination hints are part of the callback and count towards consistency only under CL.ANY
-                if (targets.contains(destination) || consistency_level == ConsistencyLevel.ANY)
-                    MessagingService.instance().sendRR(hintedMessage, destination, responseHandler);
-                else
-                    MessagingService.instance().sendOneWay(hintedMessage, destination);
+                if (!shouldHint(destination))
+                    continue;
+
+                // Avoid OOMing from hints waiting to be written.  (Unlike ordinary mutations, hint
+                // not eligible to drop if we fall behind.)
+                if (hintsInProgress.get() > maxHintsInProgress)
+                    throw new TimeoutException();
+
+                // Schedule a local hint and let the handler know it needs to wait for the hint to complete too
+                Future<Void> hintfuture = scheduleLocalHint(rm, destination, responseHandler, consistency_level);
+                responseHandler.addFutureForHint(new CreationTimeAwareFuture<Void>(hintfuture));
             }
         }
+
         sendMessages(localDataCenter, dcMessages, responseHandler);
     }
 
+    public static Future<Void> scheduleLocalHint(final RowMutation mutation,
+                                                 final InetAddress target,
+                                                 final IWriteResponseHandler responseHandler,
+                                                 final ConsistencyLevel consistencyLevel)
+    throws IOException
+    {
+        // Hint of itself doesn't make sense.
+        assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
+        hintsInProgress.incrementAndGet();
+
+        Runnable runnable = new Runnable()
+        {
+
+            public void run()
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("Adding hint for " + target);
+
+                try
+                {
+                    RowMutation hintedMutation = RowMutation.hintFor(mutation, ByteBufferUtil.bytes(target.getHostAddress()));
+                    hintedMutation.apply();
+
+                    totalHints.incrementAndGet();
+
+                    // Notify the handler only for CL == ANY
+                    if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY)
+                        responseHandler.response(null);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                finally
+                {
+                    // Decrement the current hint in the execution after the task is done.
+                    hintsInProgress.decrementAndGet();
+                }
+            }
+        };
+
+        return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable);
+    }
+
     /**
      * for each datacenter, send a message to one node to relay the write to other replicas
      */
@@ -339,19 +422,6 @@ public class StorageProxy implements Sto
         }
     }
 
-    private static void addHintHeader(Message message, InetAddress target) throws IOException
-    {
-    	FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(bos);
-        byte[] previousHints = message.getHeader(RowMutation.HINT);
-        if (previousHints != null)
-        {
-            dos.write(previousHints);
-        }
-        ByteBufferUtil.writeWithShortLength(ByteBufferUtil.bytes(target.getHostAddress()), dos);
-        message.setHeader(RowMutation.HINT, bos.toByteArray());
-    }
-
     private static void insertLocal(final RowMutation rm, final IWriteResponseHandler responseHandler)
     {
         if (logger.isDebugEnabled())
@@ -395,8 +465,8 @@ public class StorageProxy implements Sto
             String table = cm.getTable();
             AbstractReplicationStrategy rs = Table.open(table).getReplicationStrategy();
             Collection<InetAddress> writeEndpoints = getWriteEndpoints(table, cm.key());
-            Multimap<InetAddress, InetAddress> hintedEndpoints = rs.getHintedEndpoints(writeEndpoints);
-            rs.getWriteResponseHandler(writeEndpoints, hintedEndpoints, cm.consistency()).assureSufficientLiveNodes();
+
+            rs.getWriteResponseHandler(writeEndpoints, cm.consistency()).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
             IWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
@@ -432,7 +502,11 @@ public class StorageProxy implements Sto
         return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer);
     }
 
-    private static Runnable counterWriteTask(final IMutation mutation, final Multimap<InetAddress, InetAddress> hintedEndpoints, final IWriteResponseHandler responseHandler, final String localDataCenter, final ConsistencyLevel consistency_level)
+    private static Runnable counterWriteTask(final IMutation mutation, 
+                                             final Collection<InetAddress> targets,
+                                             final IWriteResponseHandler responseHandler,
+                                             final String localDataCenter,
+                                             final ConsistencyLevel consistency_level)
     {
         return new DroppableRunnable(StorageService.Verb.MUTATION)
         {
@@ -446,18 +520,17 @@ public class StorageProxy implements Sto
                 responseHandler.response(null);
 
                 // then send to replicas, if any
-                InetAddress local = FBUtilities.getBroadcastAddress();
-                hintedEndpoints.remove(local, local);
-                if (cm.shouldReplicateOnWrite() && !hintedEndpoints.isEmpty())
+                targets.remove(FBUtilities.getBroadcastAddress());
+                if (cm.shouldReplicateOnWrite() && !targets.isEmpty())
                 {
                     // We do the replication on another stage because it involves a read (see CM.makeReplicationMutation)
                     // and we want to avoid blocking too much the MUTATION stage
                     StageManager.getStage(Stage.REPLICATE_ON_WRITE).execute(new DroppableRunnable(StorageService.Verb.READ)
                     {
-                        public void runMayThrow() throws IOException
+                        public void runMayThrow() throws IOException, TimeoutException
                         {
                             // send mutation to other replica
-                            sendToHintedEndpoints(cm.makeReplicationMutation(), hintedEndpoints, responseHandler, localDataCenter, consistency_level);
+                            sendToHintedEndpoints(cm.makeReplicationMutation(), targets, responseHandler, localDataCenter, consistency_level);
                         }
                     });
                 }
@@ -1067,11 +1140,6 @@ public class StorageProxy implements Sto
         hintedHandoffEnabled = b;
     }
 
-    public static boolean isHintedHandoffEnabled()
-    {
-        return hintedHandoffEnabled;
-    }
-
     public int getMaxHintWindow()
     {
         return maxHintWindow;
@@ -1084,7 +1152,13 @@ public class StorageProxy implements Sto
 
     public static boolean shouldHint(InetAddress ep)
     {
-        return Gossiper.instance.getEndpointDowntime(ep) <= maxHintWindow;
+        if (!hintedHandoffEnabled)
+            return false;
+        
+        boolean hintWindowExpired = Gossiper.instance.getEndpointDowntime(ep) > maxHintWindow;
+        if (hintWindowExpired)
+            logger.debug("not hinting {} which has been down {}ms", ep, Gossiper.instance.getEndpointDowntime(ep));
+        return !hintWindowExpired;
     }
 
     /**
@@ -1136,7 +1210,7 @@ public class StorageProxy implements Sto
 
     private interface WritePerformer
     {
-        public void apply(IMutation mutation, Multimap<InetAddress, InetAddress> hintedEndpoints, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException;
+        public void apply(IMutation mutation, Collection<InetAddress> targets, IWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level) throws IOException, TimeoutException;
     }
 
     private static abstract class DroppableRunnable implements Runnable
@@ -1169,4 +1243,30 @@ public class StorageProxy implements Sto
 
         abstract protected void runMayThrow() throws Exception;
     }
+
+    public long getTotalHints()
+    {
+        return totalHints.get();
+    }
+
+    public int getMaxHintsInProgress()
+    {
+        return maxHintsInProgress;
+    }
+
+    public void setMaxHintsInProgress(int qs)
+    {
+        maxHintsInProgress = qs;
+    }
+
+    public int getHintsInProgress()
+    {
+        return hintsInProgress.get();
+    }
+
+    public void verifyNoHintsInProgress()
+    {
+        if (getHintsInProgress() > 0)
+            logger.warn("Some hints were not written before shutdown.  This is not supposed to happen.  You should (a) run repair, and (b) file a bug report");
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxyMBean.java Wed Aug 31 19:33:03 2011
@@ -38,8 +38,12 @@ public interface StorageProxyMBean
     public long[] getTotalWriteLatencyHistogramMicros();
     public long[] getRecentWriteLatencyHistogramMicros();
 
+    public long getTotalHints();
     public boolean getHintedHandoffEnabled();
     public void setHintedHandoffEnabled(boolean b);
     public int getMaxHintWindow();
     public void setMaxHintWindow(int ms);
+    public int getMaxHintsInProgress();
+    public void setMaxHintsInProgress(int qs);
+    public int getHintsInProgress();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Wed Aug 31 19:33:03 2011
@@ -405,12 +405,16 @@ public class StorageService implements I
             public void runMayThrow() throws ExecutionException, InterruptedException, IOException
             {
                 ThreadPoolExecutor mutationStage = StageManager.getStage(Stage.MUTATION);
-                if (!mutationStage.isShutdown())
-                {
-                    mutationStage.shutdown();
-                    mutationStage.awaitTermination(1, TimeUnit.SECONDS);
-                    CommitLog.instance.shutdownBlocking();
-                }
+                if (mutationStage.isShutdown())
+                    return; // drained already
+
+                Gossiper.instance.stop();
+                MessagingService.instance().shutdown();
+
+                mutationStage.shutdown();
+                mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
+
+                StorageProxy.instance.verifyNoHintsInProgress();
 
                 List<Future<?>> flushes = new ArrayList<Future<?>>();
                 for (Table table : Table.all())
@@ -428,6 +432,8 @@ public class StorageService implements I
                 }
                 FBUtilities.waitOnFutures(flushes);
 
+                CommitLog.instance.shutdownBlocking();
+                
                 // wait for miscellaneous tasks like sstable and commitlog segment deletion
                 tasks.shutdown();
                 if (!tasks.awaitTermination(1, TimeUnit.MINUTES))
@@ -2257,14 +2263,15 @@ public class StorageService implements I
         Gossiper.instance.stop();
         setMode("Draining: shutting down MessageService", false);
         MessagingService.instance().shutdown();
-        setMode("Draining: emptying MessageService pools", false);
-        MessagingService.instance().waitFor();
+        setMode("Draining: waiting for streaming", false);
+        MessagingService.instance().waitForStreaming();
 
         setMode("Draining: clearing mutation stage", false);
         mutationStage.shutdown();
         mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 
-        // lets flush.
+        StorageProxy.instance.verifyNoHintsInProgress();
+
         setMode("Draining: flushing column families", false);
         List<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>();
         for (String tableName : Schema.instance.getNonSystemTables())

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/WriteResponseHandler.java Wed Aug 31 19:33:03 2011
@@ -23,15 +23,15 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.UnavailableException;
+import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
@@ -42,23 +42,21 @@ public class WriteResponseHandler extend
 
     protected final AtomicInteger responses;
 
-    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
-        super(writeEndpoints, hintedEndpoints, consistencyLevel);
+        super(writeEndpoints, consistencyLevel);
         responses = new AtomicInteger(determineBlockFor(table));
     }
 
     protected WriteResponseHandler(InetAddress endpoint)
     {
-        super(Arrays.asList(endpoint),
-              ImmutableMultimap.<InetAddress, InetAddress>builder().put(endpoint, endpoint).build(),
-              ConsistencyLevel.ALL);
+        super(Arrays.asList(endpoint), ConsistencyLevel.ALL);
         responses = new AtomicInteger(1);
     }
 
-    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, Multimap<InetAddress, InetAddress> hintedEndpoints, ConsistencyLevel consistencyLevel, String table)
+    public static IWriteResponseHandler create(Collection<InetAddress> writeEndpoints, ConsistencyLevel consistencyLevel, String table)
     {
-        return new WriteResponseHandler(writeEndpoints, hintedEndpoints, consistencyLevel, table);
+        return new WriteResponseHandler(writeEndpoints, consistencyLevel, table);
     }
 
     public static IWriteResponseHandler create(InetAddress endpoint)
@@ -97,17 +95,19 @@ public class WriteResponseHandler extend
     {
         if (consistencyLevel == ConsistencyLevel.ANY)
         {
-            // ensure there are blockFor distinct living nodes (hints are ok).
-            if (hintedEndpoints.keySet().size() < responses.get())
+            // Ensure there are blockFor distinct living nodes (hints (local) are ok).
+            // Thus we include the local node (coordinator) as a valid replica if it is there already.
+            int effectiveEndpoints = writeEndpoints.contains(FBUtilities.getBroadcastAddress()) ? writeEndpoints.size() : writeEndpoints.size() + 1;
+            if (effectiveEndpoints < responses.get())
                 throw new UnavailableException();
             return;
         }
 
         // count destinations that are part of the desired target set
         int liveNodes = 0;
-        for (InetAddress destination : hintedEndpoints.keySet())
+        for (InetAddress destination : writeEndpoints)
         {
-            if (writeEndpoints.contains(destination))
+            if (FailureDetector.instance.isAlive(destination))
                 liveNodes++;
         }
         if (liveNodes < responses.get())

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/ExpiringMap.java Wed Aug 31 19:33:03 2011
@@ -26,8 +26,6 @@ import org.cliffc.high_scale_lib.NonBloc
 
 public class ExpiringMap<K, V>
 {
-    private final Function<Pair<K,V>, ?> postExpireHook;
-
     private static class CacheableObject<T>
     {
         private final T value;
@@ -53,24 +51,6 @@ public class ExpiringMap<K, V>
         }
     }
 
-    private class CacheMonitor extends TimerTask
-    {
-
-        public void run()
-        {
-            long start = System.currentTimeMillis();
-            for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
-            {
-                if (entry.getValue().isReadyToDie(start))
-                {
-                    cache.remove(entry.getKey());
-                    if (postExpireHook != null)
-                        postExpireHook.apply(new Pair<K, V>(entry.getKey(), entry.getValue().getValue()));
-                }
-            }
-        }
-    }
-
     private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K, CacheableObject<V>>();
     private final Timer timer;
     private static int counter = 0;
@@ -85,9 +65,8 @@ public class ExpiringMap<K, V>
      *
      * @param expiration the TTL for objects in the cache in milliseconds
      */
-    public ExpiringMap(long expiration, Function<Pair<K,V>, ?> postExpireHook)
+    public ExpiringMap(long expiration, final Function<Pair<K,V>, ?> postExpireHook)
     {
-        this.postExpireHook = postExpireHook;
         this.expiration = expiration;
 
         if (expiration <= 0)
@@ -96,14 +75,46 @@ public class ExpiringMap<K, V>
         }
 
         timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
-        timer.schedule(new CacheMonitor(), expiration / 2, expiration / 2);
+        TimerTask task = new TimerTask()
+        {
+            public void run()
+            {
+                long start = System.currentTimeMillis();
+                for (Map.Entry<K, CacheableObject<V>> entry : cache.entrySet())
+                {
+                    if (entry.getValue().isReadyToDie(start))
+                    {
+                        cache.remove(entry.getKey());
+                        if (postExpireHook != null)
+                            postExpireHook.apply(new Pair<K, V>(entry.getKey(), entry.getValue().getValue()));
+                    }
+                }
+            }
+        };
+        timer.schedule(task, expiration / 2, expiration / 2);
     }
 
     public void shutdown()
     {
+        while (!cache.isEmpty())
+        {
+            try
+            {
+                Thread.sleep(100);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+        }
         timer.cancel();
     }
 
+    public void clear()
+    {
+        cache.clear();
+    }
+
     public V put(K key, V value)
     {
         return put(key, value, this.expiration);

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Wed Aug 31 19:33:03 2011
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.IRowCacheProvider;
+import org.apache.cassandra.concurrent.CreationTimeAwareFuture;
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
@@ -584,6 +585,36 @@ public class FBUtilities
             result.get(ms, TimeUnit.MILLISECONDS);
     }
 
+
+    /**
+     * Waits for the futures to complete.
+     * @param timeout the timeout expressed in <code>TimeUnit</code> units
+     * @param timeUnit TimeUnit
+     * @throws TimeoutException if the waiting time exceeds <code>timeout</code>
+     */
+    public static void waitOnFutures(List<CreationTimeAwareFuture<?>> hintFutures, long timeout, TimeUnit timeUnit) throws TimeoutException
+    {
+        for (Future<?> future : hintFutures)
+        {
+            try
+            {
+                future.get(timeout, timeUnit);
+            }
+            catch (InterruptedException ex)
+            {
+                throw new AssertionError(ex);
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (TimeoutException e)
+            {
+               throw e;
+            }
+        }
+    }
+
     public static IPartitioner newPartitioner(String partitionerClassName) throws ConfigurationException
     {
         if (!partitionerClassName.contains("."))

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/ConsistencyLevelTest.java Wed Aug 31 19:33:03 2011
@@ -25,7 +25,6 @@ import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.google.common.collect.HashMultimap;
 import org.apache.cassandra.config.Schema;
 import org.junit.Test;
 
@@ -63,13 +62,11 @@ public class ConsistencyLevelTest extend
 
         ArrayList<Token> endpointTokens = new ArrayList<Token>();
         ArrayList<Token> keyTokens = new ArrayList<Token>();
+        List<InetAddress> hostsInUse = new ArrayList<InetAddress>();
         List<InetAddress> hosts = new ArrayList<InetAddress>();
 
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, RING_SIZE);
 
-        HashMultimap<InetAddress, InetAddress> hintedNodes = HashMultimap.create();
-
-
         AbstractReplicationStrategy strategy;
 
         for (final String table : Schema.instance.getNonSystemTables())
@@ -88,14 +85,19 @@ public class ConsistencyLevelTest extend
 
                 for (int i = 0; i < replicationFactor; i++)
                 {
-                    hintedNodes.clear();
+                    hostsInUse.clear();
+                    for (int j = 0 ; j < i ; j++)
+                    {
+                        hostsInUse.add(hosts.get(j));
+                    }
 
-                    for (int j = 0; j < i; j++)
+                    if (hostsInUse.isEmpty())
                     {
-                        hintedNodes.put(hosts.get(j), hosts.get(j));
+                        // We skip this case as it means RF = 0 in this simulation.
+                        continue;
                     }
 
-                    IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hosts, hintedNodes, c);
+                    IWriteResponseHandler writeHandler = strategy.getWriteResponseHandler(hostsInUse, c);
 
                     IReadCommand command = new IReadCommand()
                     {
@@ -105,7 +107,7 @@ public class ConsistencyLevelTest extend
                         }
                     };
                     RowRepairResolver resolver = new RowRepairResolver(table, ByteBufferUtil.bytes("foo"));
-                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, new ArrayList<InetAddress>(hintedNodes.keySet()));
+                    ReadCallback<Row> readHandler = StorageProxy.getReadCallback(resolver, command, c, hostsInUse);
 
                     boolean isWriteUnavailable = false;
                     boolean isReadUnavailable = false;
@@ -128,41 +130,42 @@ public class ConsistencyLevelTest extend
                     }
 
                     //these should always match (in this kind of test)
-                    assertTrue(isWriteUnavailable == isReadUnavailable);
+                    assertTrue(String.format("Node Alive: %d - CL: %s - isWriteUnavailable: %b - isReadUnavailable: %b", hostsInUse.size(), c, isWriteUnavailable, isReadUnavailable),
+                               isWriteUnavailable == isReadUnavailable);
 
                     switch (c)
                     {
                         case ALL:
                             if (isWriteUnavailable)
-                                assertTrue(hintedNodes.size() < replicationFactor);
+                                assertTrue(hostsInUse.size() < replicationFactor);
                             else
-                                assertTrue(hintedNodes.size() >= replicationFactor);
+                                assertTrue(hostsInUse.size() >= replicationFactor);
 
                             break;
                         case ONE:
                         case ANY:
                             if (isWriteUnavailable)
-                                assertTrue(hintedNodes.size() == 0);
+                                assertTrue(hostsInUse.size() == 0);
                             else
-                                assertTrue(hintedNodes.size() > 0);
+                                assertTrue(hostsInUse.size() > 0);
                             break;
                         case TWO:
                             if (isWriteUnavailable)
-                                assertTrue(hintedNodes.size() < 2);
+                                assertTrue(hostsInUse.size() < 2);
                             else
-                                assertTrue(hintedNodes.size() >= 2);
+                                assertTrue(hostsInUse.size() >= 2);
                             break;
                         case THREE:
                             if (isWriteUnavailable)
-                                assertTrue(hintedNodes.size() < 3);
+                                assertTrue(hostsInUse.size() < 3);
                             else
-                                assertTrue(hintedNodes.size() >= 3);
+                                assertTrue(hostsInUse.size() >= 3);
                             break;
                         case QUORUM:
                             if (isWriteUnavailable)
-                                assertTrue(hintedNodes.size() < (replicationFactor / 2 + 1));
+                                assertTrue(hostsInUse.size() < (replicationFactor / 2 + 1));
                             else
-                                assertTrue(hintedNodes.size() >= (replicationFactor / 2 + 1));
+                                assertTrue(hostsInUse.size() >= (replicationFactor / 2 + 1));
                             break;
                         default:
                             fail("Unhandled CL: " + c);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java?rev=1163760&r1=1163759&r2=1163760&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/RemoveTest.java Wed Aug 31 19:33:03 2011
@@ -87,6 +87,7 @@ public class RemoveTest extends CleanupH
     public void tearDown()
     {
         SinkManager.clear();
+        MessagingService.instance().clearCallbacksUnsafe();
         MessagingService.instance().shutdown();
         ss.setPartitionerUnsafe(oldPartitioner);
     }



Mime
View raw message