cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject cassandra git commit: Add WriteFailureException to native protocol
Date Thu, 12 Mar 2015 16:41:22 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e5d119aab -> c059a5689


Add WriteFailureException to native protocol

Patch by Stefania Alborghetti; reviewed by Tyler Hobbs for
CASSANDRA-8592


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c059a568
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c059a568
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c059a568

Branch: refs/heads/trunk
Commit: c059a56890dd7b9aca0addca75a05bcce6b65a77
Parents: e5d119a
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Thu Mar 12 11:40:22 2015 -0500
Committer: Tyler Hobbs <tylerhobbs@apache.org>
Committed: Thu Mar 12 11:40:22 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 doc/native_protocol_v4.spec                     |  32 +++-
 .../apache/cassandra/db/BatchlogManager.java    |  36 +++--
 .../db/CounterMutationVerbHandler.java          |  40 ++---
 .../cassandra/db/HintedHandOffManager.java      |  12 +-
 src/java/org/apache/cassandra/db/Keyspace.java  |   6 +
 .../cassandra/db/MutationVerbHandler.java       |  15 +-
 .../cassandra/exceptions/ExceptionCode.java     |   1 +
 .../exceptions/RequestFailureException.java     |   2 +-
 .../exceptions/WriteFailureException.java       |  32 ++++
 .../locator/AbstractReplicationStrategy.java    |  12 +-
 .../org/apache/cassandra/net/CallbackInfo.java  |   6 +-
 .../org/apache/cassandra/net/IVerbHandler.java  |   4 +-
 .../cassandra/net/MessageDeliveryTask.java      |  37 +++--
 .../apache/cassandra/net/MessagingService.java  |   4 +-
 .../apache/cassandra/net/WriteCallbackInfo.java |   2 +-
 .../service/AbstractWriteResponseHandler.java   |  57 ++++++-
 .../DatacenterSyncWriteResponseHandler.java     |   4 +-
 .../service/DatacenterWriteResponseHandler.java |  12 +-
 .../apache/cassandra/service/StorageProxy.java  | 157 ++++++++++++-------
 .../cassandra/service/WriteResponseHandler.java |   4 +-
 .../transport/messages/ErrorMessage.java        |  31 +++-
 .../apache/cassandra/db/SerializationsTest.java |   4 +-
 23 files changed, 354 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c67acd1..49f6358 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 3.0
+ * Add WriteFailureException to native protocol, notify coordinator of
+   write failures (CASSANDRA-8592)
  * Convert SequentialWriter to nio (CASSANDRA-8709)
  * Add role based access control (CASSANDRA-7653, 8650, 7216, 8760, 8849, 8761, 8850)
  * Record client ip address in tracing sessions (CASSANDRA-8162)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/doc/native_protocol_v4.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v4.spec b/doc/native_protocol_v4.spec
index 03a5a50..69adc17 100644
--- a/doc/native_protocol_v4.spec
+++ b/doc/native_protocol_v4.spec
@@ -1010,9 +1010,9 @@ Table of Contents
                              - "BATCH": the write was a (logged) batch write.
                                If this type is received, it means the batch log
                                has been successfully written (otherwise a
-                               "BATCH_LOG" type would have been send instead).
+                               "BATCH_LOG" type would have been sent instead).
                              - "UNLOGGED_BATCH": the write was an unlogged
-                               batch. Not batch log write has been attempted.
+                               batch. No batch log write has been attempted.
                              - "COUNTER": the write was a counter write
                                (batched or not).
                              - "BATCH_LOG": the timeout occured during the
@@ -1058,6 +1058,34 @@ Table of Contents
                 <keyspace> is the keyspace [string] of the failed function
                 <function> is the name [string] of the failed function
                 <arg_types> [string list] one string for each argument type (as CQL type) of the failed function
+    0x1500    Write_failure: A non-timeout exception during a write request. The rest
+              of the ERROR message body will be
+                <cl><received><blockfor><numfailures><write_type>
+              where:
+                <cl> is the [consistency] level of the query having triggered
+                     the exception.
+                <received> is an [int] representing the number of nodes having
+                           answered the request.
+                <blockfor> is the number of replicas whose response is
+                           required to achieve <cl>.
+                <numfailures> is an [int] representing the number of nodes that
+                              experience a failure while executing the request.
+                <writeType> is a [string] that describe the type of the write
+                            that failed. The value of that string can be one
+                            of:
+                             - "SIMPLE": the write was a non-batched
+                               non-counter write.
+                             - "BATCH": the write was a (logged) batch write.
+                               If this type is received, it means the batch log
+                               has been successfully written (otherwise a
+                               "BATCH_LOG" type would have been sent instead).
+                             - "UNLOGGED_BATCH": the write was an unlogged
+                               batch. No batch log write has been attempted.
+                             - "COUNTER": the write was a counter write
+                               (batched or not).
+                             - "BATCH_LOG": the failure occured during the
+                               write to the batch log when a (logged) batch
+                               write was requested.
 
     0x2000    Syntax_error: The submitted query has a syntax error.
     0x2100    Unauthorized: The logged user doesn't have the right to perform

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index e71a62c..8eaea52 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -31,7 +31,7 @@ import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,9 +41,11 @@ import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
@@ -54,7 +56,6 @@ import org.apache.cassandra.service.WriteResponseHandler;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.WrappedRunnable;
-
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 public class BatchlogManager implements BatchlogManagerMBean
@@ -264,7 +265,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         private final ByteBuffer data;
         private final int version;
 
-        private List<ReplayWriteResponseHandler> replayHandlers;
+        private List<ReplayWriteResponseHandler<Mutation>> replayHandlers;
 
         public Batch(UUID id, long writtenAt, ByteBuffer data, int version)
         {
@@ -298,14 +299,15 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             for (int i = 0; i < replayHandlers.size(); i++)
             {
-                ReplayWriteResponseHandler handler = replayHandlers.get(i);
+                ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
                 try
                 {
                     handler.get();
                 }
-                catch (WriteTimeoutException e)
+                catch (WriteTimeoutException|WriteFailureException e)
                 {
-                    logger.debug("Timed out replaying a batched mutation to a node, will write a hint");
+                    logger.debug("Failed replaying a batched mutation to a node, will write a hint");
+                    logger.debug("Failure was : {}", e.getMessage());
                     // writing hints for the rest to hints, starting from i
                     writeHintsForUndeliveredEndpoints(i);
                     return;
@@ -348,7 +350,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                 {
                     Mutation undeliveredMutation = replayingMutations.get(i);
                     int ttl = calculateHintTTL(replayingMutations);
-                    ReplayWriteResponseHandler handler = replayHandlers.get(i);
+                    ReplayWriteResponseHandler<Mutation> handler = replayHandlers.get(i);
 
                     if (ttl > 0 && handler != null)
                         for (InetAddress endpoint : handler.undelivered)
@@ -361,12 +363,12 @@ public class BatchlogManager implements BatchlogManagerMBean
             }
         }
 
-        private List<ReplayWriteResponseHandler> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
+        private List<ReplayWriteResponseHandler<Mutation>> sendReplays(List<Mutation> mutations, long writtenAt, int ttl)
         {
-            List<ReplayWriteResponseHandler> handlers = new ArrayList<>(mutations.size());
+            List<ReplayWriteResponseHandler<Mutation>> handlers = new ArrayList<>(mutations.size());
             for (Mutation mutation : mutations)
             {
-                ReplayWriteResponseHandler handler = sendSingleReplayMutation(mutation, writtenAt, ttl);
+                ReplayWriteResponseHandler<Mutation> handler = sendSingleReplayMutation(mutation, writtenAt, ttl);
                 if (handler != null)
                     handlers.add(handler);
             }
@@ -379,7 +381,7 @@ public class BatchlogManager implements BatchlogManagerMBean
          *
          * @return direct delivery handler to wait on or null, if no live nodes found
          */
-        private ReplayWriteResponseHandler sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
+        private ReplayWriteResponseHandler<Mutation> sendSingleReplayMutation(final Mutation mutation, long writtenAt, int ttl)
         {
             Set<InetAddress> liveEndpoints = new HashSet<>();
             String ks = mutation.getKeyspaceName();
@@ -399,7 +401,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             if (liveEndpoints.isEmpty())
                 return null;
 
-            ReplayWriteResponseHandler handler = new ReplayWriteResponseHandler(liveEndpoints);
+            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
             MessageOut<Mutation> message = mutation.createMessage();
             for (InetAddress endpoint : liveEndpoints)
                 MessagingService.instance().sendRR(message, endpoint, handler, false);
@@ -418,7 +420,11 @@ public class BatchlogManager implements BatchlogManagerMBean
             return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt);
         }
 
-        private static class ReplayWriteResponseHandler extends WriteResponseHandler
+        /**
+         * A wrapper of WriteResponseHandler that stores the addresses of the endpoints from
+         * which we did not receive a successful reply.
+         */
+        private static class ReplayWriteResponseHandler<T> extends WriteResponseHandler<T>
         {
             private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<InetAddress, Boolean>());
 
@@ -435,9 +441,9 @@ public class BatchlogManager implements BatchlogManagerMBean
             }
 
             @Override
-            public void response(MessageIn m)
+            public void response(MessageIn<T> m)
             {
-                boolean removed = undelivered.remove(m.from);
+                boolean removed = undelivered.remove(m == null ? FBUtilities.getBroadcastAddress() : m.from);
                 assert removed;
                 super.response(m);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d65fbd7..d9ee38a 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -34,31 +34,23 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
 
     public void doVerb(final MessageIn<CounterMutation> message, final int id)
     {
-        try
-        {
-            final CounterMutation cm = message.payload;
-            logger.debug("Applying forwarded {}", cm);
+        final CounterMutation cm = message.payload;
+        logger.debug("Applying forwarded {}", cm);
 
-            String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-            // We should not wait for the result of the write in this thread,
-            // otherwise we could have a distributed deadlock between replicas
-            // running this VerbHandler (see #4578).
-            // Instead, we use a callback to send the response. Note that the callback
-            // will not be called if the request timeout, but this is ok
-            // because the coordinator of the counter mutation will timeout on
-            // it's own in that case.
-            StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable()
-            {
-                public void run()
-                {
-                    MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from);
-                }
-            });
-        }
-        catch (RequestExecutionException e)
+        String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
+        // We should not wait for the result of the write in this thread,
+        // otherwise we could have a distributed deadlock between replicas
+        // running this VerbHandler (see #4578).
+        // Instead, we use a callback to send the response. Note that the callback
+        // will not be called if the request timeout, but this is ok
+        // because the coordinator of the counter mutation will timeout on
+        // it's own in that case.
+        StorageProxy.applyCounterMutationOnLeader(cm, localDataCenter, new Runnable()
         {
-            // The coordinator will timeout on it's own so ignore
-            logger.debug("counter error", e);
-        }
+            public void run()
+            {
+                MessagingService.instance().sendReply(new WriteResponse().createMessage(), id, message.from);
+            }
+        });
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 324943a..589958e 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.FailureDetector;
@@ -389,7 +390,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 break;
             }
 
-            List<WriteResponseHandler> responseHandlers = Lists.newArrayList();
+            List<WriteResponseHandler<Mutation>> responseHandlers = Lists.newArrayList();
             for (final Cell hint : hintsPage)
             {
                 // check if hints delivery has been paused during the process
@@ -452,20 +453,21 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                         deleteHint(hostIdBytes, hint.name(), hint.timestamp());
                     }
                 };
-                WriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.SIMPLE, callback);
+                WriteResponseHandler<Mutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.SIMPLE, callback);
                 MessagingService.instance().sendRR(message, endpoint, responseHandler, false);
                 responseHandlers.add(responseHandler);
             }
 
-            for (WriteResponseHandler handler : responseHandlers)
+            for (WriteResponseHandler<Mutation> handler : responseHandlers)
             {
                 try
                 {
                     handler.get();
                 }
-                catch (WriteTimeoutException e)
+                catch (WriteTimeoutException|WriteFailureException e)
                 {
-                    logger.info("Timed out replaying hints to {}; aborting ({} delivered)", endpoint, rowsReplayed);
+                    logger.info("Failed replaying hints to {}; aborting ({} delivered), error : {}",
+                        endpoint, rowsReplayed, e.getMessage());
                     break delivery;
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index e3301b1..b0cc25d 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -58,6 +58,9 @@ public class Keyspace
 
     private static final Logger logger = LoggerFactory.getLogger(Keyspace.class);
 
+    private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks", "");
+    private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty();
+
     public final KeyspaceMetrics metric;
 
     // It is possible to call Keyspace.open without a running daemon, so it makes sense to ensure
@@ -357,6 +360,9 @@ public class Keyspace
      */
     public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
+        if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
+            throw new RuntimeException("Testing write failures");
+
         try (OpOrder.Group opGroup = writeOrder.start())
         {
             // write the mutation to the commitlog and memtables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index 43ffeae..92bfdb5 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -18,24 +18,20 @@
 package org.apache.cassandra.db;
 
 import java.io.DataInputStream;
+import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.tracing.Tracing;
 
 public class MutationVerbHandler implements IVerbHandler<Mutation>
 {
-    private static final Logger logger = LoggerFactory.getLogger(MutationVerbHandler.class);
+    private static final boolean TEST_FAIL_WRITES = System.getProperty("cassandra.test.fail_writes", "false").equalsIgnoreCase("true");
 
-    public void doVerb(MessageIn<Mutation> message, int id)
+    public void doVerb(MessageIn<Mutation> message, int id)  throws IOException
     {
-        try
-        {
             // Check if there were any forwarding headers in this message
             byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
             InetAddress replyTo;
@@ -55,11 +51,6 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
             WriteResponse response = new WriteResponse();
             Tracing.trace("Enqueuing response to {}", replyTo);
             MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
-        }
-        catch (IOException e)
-        {
-            logger.error("Error in mutation", e);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
index 80cd4df..6ad0577 100644
--- a/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
+++ b/src/java/org/apache/cassandra/exceptions/ExceptionCode.java
@@ -41,6 +41,7 @@ public enum ExceptionCode
     READ_TIMEOUT    (0x1200),
     READ_FAILURE    (0x1300),
     FUNCTION_FAILURE(0x1400),
+    WRITE_FAILURE   (0x1500),
 
     // 2xx: problem validating the request
     SYNTAX_ERROR    (0x2000),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
index 1ff44d9..6b8b40f 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureException.java
@@ -28,7 +28,7 @@ public class RequestFailureException extends RequestExecutionException
 
     protected RequestFailureException(ExceptionCode code, ConsistencyLevel consistency, int received, int failures, int blockFor)
     {
-        super(code, String.format("Operation failed - received %d responses and %d failures.", received, failures));
+        super(code, String.format("Operation failed - received %d responses and %d failures", received, failures));
         this.consistency = consistency;
         this.received = received;
         this.failures = failures;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/WriteFailureException.java b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
new file mode 100644
index 0000000..24de9b1
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/WriteFailureException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
+
+public class WriteFailureException extends RequestFailureException
+{
+    public final WriteType writeType;
+
+    public WriteFailureException(ConsistencyLevel consistency, int received, int failures, int blockFor, WriteType writeType)
+    {
+        super(ExceptionCode.WRITE_FAILURE, consistency, received, failures, blockFor);
+        this.writeType = writeType;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index d3b8571..461265c 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -128,18 +128,22 @@ public abstract class AbstractReplicationStrategy
      */
     public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
 
-    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback, WriteType writeType)
+    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+                                                                Collection<InetAddress> pendingEndpoints,
+                                                                ConsistencyLevel consistency_level,
+                                                                Runnable callback,
+                                                                WriteType writeType)
     {
         if (consistency_level.isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
-            return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+            return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
         {
-            return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+            return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
         }
-        return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+        return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
     }
 
     private Keyspace getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/CallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/CallbackInfo.java b/src/java/org/apache/cassandra/net/CallbackInfo.java
index b61210c..ea000ae 100644
--- a/src/java/org/apache/cassandra/net/CallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/CallbackInfo.java
@@ -33,17 +33,13 @@ public class CallbackInfo
     protected final IVersionedSerializer<?> serializer;
     private final boolean failureCallback;
 
-    public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer)
-    {
-        this(target, callback, serializer, false);
-    }
-
     /**
      * Create CallbackInfo without sent message
      *
      * @param target target to send message
      * @param callback
      * @param serializer serializer to deserialize response message
+     * @param failureCallback True when we have a callback to handle failures
      */
     public CallbackInfo(InetAddress target, IAsyncCallback callback, IVersionedSerializer<?> serializer, boolean failureCallback)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/IVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IVerbHandler.java b/src/java/org/apache/cassandra/net/IVerbHandler.java
index 7f835c0..574f30f 100644
--- a/src/java/org/apache/cassandra/net/IVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/IVerbHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.IOException;
+
 /**
  * IVerbHandler provides the method that all verb handlers need to implement.
  * The concrete implementation of this interface would provide the functionality
@@ -34,5 +36,5 @@ public interface IVerbHandler<T>
      * @param message - incoming message that needs handling.
      * @param id
      */
-    public void doVerb(MessageIn<T> message, int id);
+    public void doVerb(MessageIn<T> message, int id) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
index da12d7a..f160464 100644
--- a/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
+++ b/src/java/org/apache/cassandra/net/MessageDeliveryTask.java
@@ -17,12 +17,13 @@
  */
 package org.apache.cassandra.net;
 
+import java.io.IOException;
 import java.util.EnumSet;
 
-import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.filter.TombstoneOverwhelmingException;
 import org.apache.cassandra.gms.Gossiper;
 
 public class MessageDeliveryTask implements Runnable
@@ -62,24 +63,36 @@ public class MessageDeliveryTask implements Runnable
         {
             verbHandler.doVerb(message, id);
         }
+        catch (IOException ioe)
+        {
+            handleFailure(ioe);
+            throw new RuntimeException(ioe);
+        }
+        catch (TombstoneOverwhelmingException toe)
+        {
+            handleFailure(toe);
+            logger.error(toe.getMessage());
+        }
         catch (Throwable t)
         {
-            if (message.doCallbackOnFailure())
-            {
-                MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
-                                                    .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
-                MessagingService.instance().sendReply(response, id, message.from);
-            }
-
-            if (t instanceof TombstoneOverwhelmingException)
-                logger.error(t.getMessage());
-            else
-                throw t;
+            handleFailure(t);
+            throw t;
         }
+
         if (GOSSIP_VERBS.contains(message.verb))
             Gossiper.instance.setLastProcessedMessageAt(constructionTime);
     }
 
+    private void handleFailure(Throwable t)
+    {
+        if (message.doCallbackOnFailure())
+        {
+            MessageOut response = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+                                                .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE);
+            MessagingService.instance().sendReply(response, id, message.from);
+        }
+    }
+
     EnumSet<MessagingService.Verb> GOSSIP_VERBS = EnumSet.of(MessagingService.Verb.GOSSIP_DIGEST_ACK,
                                                              MessagingService.Verb.GOSSIP_DIGEST_ACK2,
                                                              MessagingService.Verb.GOSSIP_DIGEST_SYN);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index c333b04..a5cbfa7 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -639,11 +639,11 @@ public final class MessagingService implements MessagingServiceMBean
      */
     public int sendRR(MessageOut<? extends IMutation> message,
                       InetAddress to,
-                      AbstractWriteResponseHandler handler,
+                      AbstractWriteResponseHandler<? extends IMutation> handler,
                       boolean allowHints)
     {
         int id = addCallback(handler, message, to, message.getTimeout(), handler.consistencyLevel, allowHints);
-        sendOneWay(message, id, to);
+        sendOneWay(message.withParameter(FAILURE_CALLBACK_PARAM, ONE_BYTE), id, to);
         return id;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
index 987ec15..9322631 100644
--- a/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
+++ b/src/java/org/apache/cassandra/net/WriteCallbackInfo.java
@@ -37,7 +37,7 @@ public class WriteCallbackInfo extends CallbackInfo
                              ConsistencyLevel consistencyLevel,
                              boolean allowHints)
     {
-        super(target, callback, serializer);
+        super(target, callback, serializer, true);
         assert message != null;
         this.sentMessage = message;
         this.consistencyLevel = consistencyLevel;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 72e5b9c..8978034 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -29,11 +32,14 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
-public abstract class AbstractWriteResponseHandler implements IAsyncCallback
+public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T>
 {
+    protected static final Logger logger = LoggerFactory.getLogger( AbstractWriteResponseHandler.class );
+
     private final SimpleCondition condition = new SimpleCondition();
     protected final Keyspace keyspace;
     protected final long start;
@@ -42,6 +48,9 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
     protected final Runnable callback;
     protected final Collection<InetAddress> pendingEndpoints;
     private final WriteType writeType;
+    private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
+        = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
+    private volatile int failures = 0;
 
     /**
      * @param callback A callback to be called when the write is successful.
@@ -62,7 +71,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
         this.writeType = writeType;
     }
 
-    public void get() throws WriteTimeoutException
+    public void get() throws WriteTimeoutException, WriteFailureException
     {
         long requestTimeout = writeType == WriteType.COUNTER
                             ? DatabaseDescriptor.getCounterWriteRpcTimeout()
@@ -82,8 +91,8 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
 
         if (!success)
         {
-            int acks = ackCount();
             int blockedFor = totalBlockFor();
+            int acks = ackCount();
             // It's pretty unlikely, but we can race between exiting await above and here, so
             // that we could now have enough acks. In that case, we "lie" on the acks count to
             // avoid sending confusing info to the user (see CASSANDRA-6491).
@@ -91,8 +100,16 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
                 acks = blockedFor - 1;
             throw new WriteTimeoutException(writeType, consistencyLevel, acks, blockedFor);
         }
+
+        if (totalBlockFor() + failures > totalEndpoints())
+        {
+            throw new WriteFailureException(consistencyLevel, ackCount(), failures, totalBlockFor(), writeType);
+        }
     }
 
+    /** 
+     * @return the minimum number of endpoints that must reply. 
+     */
     protected int totalBlockFor()
     {
         // During bootstrap, we have to include the pending endpoints or we may fail the consistency level
@@ -100,10 +117,29 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
         return consistencyLevel.blockFor(keyspace) + pendingEndpoints.size();
     }
 
+    /** 
+     * @return the total number of endpoints the request has been sent to. 
+     */
+    protected int totalEndpoints()
+    {
+        return naturalEndpoints.size() + pendingEndpoints.size();
+    }
+
+    /**
+     * @return true if the message counts towards the totalBlockFor() threshold
+     */
+    protected boolean waitingFor(InetAddress from)
+    {
+        return true;
+    }
+
+    /**
+     * @return number of responses received
+     */
     protected abstract int ackCount();
 
     /** null message means "response from local write" */
-    public abstract void response(MessageIn msg);
+    public abstract void response(MessageIn<T> msg);
 
     public void assureSufficientLiveNodes() throws UnavailableException
     {
@@ -116,4 +152,17 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
         if (callback != null)
             callback.run();
     }
+
+    @Override
+    public void onFailure(InetAddress from)
+    {
+        logger.trace("Got failure from {}", from);
+
+        int n = waitingFor(from)
+              ? failuresUpdater.incrementAndGet(this)
+              : failures;
+
+        if (totalBlockFor() + n > totalEndpoints())
+            signal();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index 511a122..b095c7f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.db.WriteType;
 /**
  * This class blocks for a quorum of responses _in all datacenters_ (CL.EACH_QUORUM).
  */
-public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHandler
+public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
 {
     private static final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 
@@ -68,7 +68,7 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         }
     }
 
-    public void response(MessageIn message)
+    public void response(MessageIn<T> message)
     {
         String dataCenter = message == null
                             ? DatabaseDescriptor.getLocalDataCenter()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index fb8f992..b1b7b10 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.db.WriteType;
 /**
  * This class blocks for a quorum of responses _in the local datacenter only_ (CL.LOCAL_QUORUM).
  */
-public class DatacenterWriteResponseHandler extends WriteResponseHandler
+public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
 {
     public DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
                                           Collection<InetAddress> pendingEndpoints,
@@ -42,9 +42,9 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
     }
 
     @Override
-    public void response(MessageIn message)
+    public void response(MessageIn<T> message)
     {
-        if (message == null || consistencyLevel.isLocal(message.from))
+        if (message == null || waitingFor(message.from))
             super.response(message);
     }
 
@@ -55,4 +55,10 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
         // or we may fail the consistency level guarantees (see #833, #8058)
         return consistencyLevel.blockFor(keyspace) + consistencyLevel.countLocalEndpoints(pendingEndpoints);
     }
+
+    @Override
+    protected boolean waitingFor(InetAddress from)
+    {
+        return consistencyLevel.isLocal(from);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 80b22f4..d667b1e 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,7 +31,6 @@ import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.metrics.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +57,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.*;
 import org.apache.cassandra.sink.SinkManager;
@@ -113,7 +113,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             public void apply(IMutation mutation,
                               Iterable<InetAddress> targets,
-                              AbstractWriteResponseHandler responseHandler,
+                              AbstractWriteResponseHandler<IMutation> responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistency_level)
             throws OverloadedException
@@ -133,7 +133,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             public void apply(IMutation mutation,
                               Iterable<InetAddress> targets,
-                              AbstractWriteResponseHandler responseHandler,
+                              AbstractWriteResponseHandler<IMutation> responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistencyLevel)
             {
@@ -145,7 +145,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             public void apply(IMutation mutation,
                               Iterable<InetAddress> targets,
-                              AbstractWriteResponseHandler responseHandler,
+                              AbstractWriteResponseHandler<IMutation> responseHandler,
                               String localDataCenter,
                               ConsistencyLevel consistencyLevel)
             {
@@ -203,7 +203,7 @@ public class StorageProxy implements StorageProxyMBean
                                    ConsistencyLevel consistencyForPaxos,
                                    ConsistencyLevel consistencyForCommit,
                                    ClientState state)
-    throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, WriteTimeoutException, InvalidRequestException
+    throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
     {
         final long start = System.nanoTime();
         int contentions = 0;
@@ -274,6 +274,11 @@ public class StorageProxy implements StorageProxyMBean
             casWriteMetrics.timeouts.mark();
             throw e;
         }
+        catch (WriteFailureException|ReadFailureException e)
+        {
+            casWriteMetrics.failures.mark();
+            throw e;
+        }
         catch(UnavailableException e)
         {
             casWriteMetrics.unavailables.mark();
@@ -346,7 +351,7 @@ public class StorageProxy implements StorageProxyMBean
                                                            ConsistencyLevel consistencyForCommit,
                                                            final boolean isWrite,
                                                            ClientState state)
-    throws WriteTimeoutException
+    throws WriteTimeoutException, WriteFailureException
     {
         long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
 
@@ -469,7 +474,7 @@ public class StorageProxy implements StorageProxyMBean
         return false;
     }
 
-    private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException
+    private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel) throws WriteTimeoutException, WriteFailureException
     {
         boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
@@ -478,7 +483,7 @@ public class StorageProxy implements StorageProxyMBean
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspace.getName(), tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspace.getName());
 
-        AbstractWriteResponseHandler responseHandler = null;
+        AbstractWriteResponseHandler<Commit> responseHandler = null;
         if (shouldBlock)
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -491,7 +496,7 @@ public class StorageProxy implements StorageProxyMBean
             if (FailureDetector.instance.isAlive(destination))
             {
                 if (shouldBlock)
-                    MessagingService.instance().sendRR(message, destination, responseHandler);
+                    MessagingService.instance().sendRRWithFailure(message, destination, responseHandler);
                 else
                     MessagingService.instance().sendOneWay(message, destination);
             }
@@ -511,13 +516,13 @@ public class StorageProxy implements StorageProxyMBean
      * @param consistency_level the consistency level for the operation
      */
     public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level)
-    throws UnavailableException, OverloadedException, WriteTimeoutException
+    throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
         final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
 
         long startTime = System.nanoTime();
-        List<AbstractWriteResponseHandler> responseHandlers = new ArrayList<>(mutations.size());
+        List<AbstractWriteResponseHandler<IMutation>> responseHandlers = new ArrayList<>(mutations.size());
 
         try
         {
@@ -535,40 +540,32 @@ public class StorageProxy implements StorageProxyMBean
             }
 
             // wait for writes.  throws TimeoutException if necessary
-            for (AbstractWriteResponseHandler responseHandler : responseHandlers)
+            for (AbstractWriteResponseHandler<IMutation> responseHandler : responseHandlers)
             {
                 responseHandler.get();
             }
         }
-        catch (WriteTimeoutException ex)
+        catch (WriteTimeoutException|WriteFailureException ex)
         {
             if (consistency_level == ConsistencyLevel.ANY)
             {
-                // hint all the mutations (except counters, which can't be safely retried).  This means
-                // we'll re-hint any successful ones; doesn't seem worth it to track individual success
-                // just for this unusual case.
-                for (IMutation mutation : mutations)
-                {
-                    if (mutation instanceof CounterMutation)
-                        continue;
-
-                    Token tk = StorageService.getPartitioner().getToken(mutation.key());
-                    List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
-                    Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
-                    for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
-                    {
-                        // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and
-                        // CASSANDRA-6510), so there is no need to hint or retry
-                        if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target))
-                            submitHint((Mutation) mutation, target, null);
-                    }
-                }
-                Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+                hintMutations(mutations);
             }
             else
             {
-                writeMetrics.timeouts.mark();
-                Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor);
+                if (ex instanceof WriteFailureException)
+                {
+                    writeMetrics.failures.mark();
+                    WriteFailureException fe = (WriteFailureException)ex;
+                    Tracing.trace("Write failure; received {} of {} required replies, failed {} requests",
+                        new Object[] {fe.received, fe.blockFor, fe.failures});
+                }
+                else
+                {
+                    writeMetrics.timeouts.mark();
+                    WriteTimeoutException te = (WriteTimeoutException)ex;
+                    Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor);
+                }
                 throw ex;
             }
         }
@@ -590,11 +587,39 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
+    /** hint all the mutations (except counters, which can't be safely retried).  This means
+      * we'll re-hint any successful ones; doesn't seem worth it to track individual success
+      * just for this unusual case.
+
+      * @param mutations the mutations that require hints
+      */
+    private static void hintMutations(Collection<? extends IMutation> mutations)
+    {
+        for (IMutation mutation : mutations)
+        {
+            if (mutation instanceof CounterMutation)
+                continue;
+
+            Token tk = StorageService.getPartitioner().getToken(mutation.key());
+            List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(mutation.getKeyspaceName(), tk);
+            Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, mutation.getKeyspaceName());
+            for (InetAddress target : Iterables.concat(naturalEndpoints, pendingEndpoints))
+            {
+                // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and
+                // CASSANDRA-6510), so there is no need to hint or retry
+                if (!target.equals(FBUtilities.getBroadcastAddress()) && shouldHint(target))
+                    submitHint((Mutation) mutation, target, null);
+            }
+        }
+
+        Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write");
+    }
+
     @SuppressWarnings("unchecked")
     public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
                                           ConsistencyLevel consistencyLevel,
                                           boolean mutateAtomically)
-    throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException
+    throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException
     {
         Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
 
@@ -658,6 +683,12 @@ public class StorageProxy implements StorageProxyMBean
             Tracing.trace("Write timeout; received {} of {} required replies", e.received, e.blockFor);
             throw e;
         }
+        catch (WriteFailureException e)
+        {
+            writeMetrics.failures.mark();
+            Tracing.trace("Write failure; received {} of {} required replies", e.received, e.blockFor);
+            throw e;
+        }
         finally
         {
             writeMetrics.addNano(System.nanoTime() - startTime);
@@ -665,9 +696,9 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddress> endpoints, UUID uuid)
-    throws WriteTimeoutException
+    throws WriteTimeoutException, WriteFailureException
     {
-        AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
+        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
                                                                         Collections.<InetAddress>emptyList(),
                                                                         ConsistencyLevel.ONE,
                                                                         Keyspace.open(SystemKeyspace.NAME),
@@ -702,7 +733,7 @@ public class StorageProxy implements StorageProxyMBean
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
     {
-        AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
+        AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
                                                                         Collections.<InetAddress>emptyList(),
                                                                         ConsistencyLevel.ANY,
                                                                         Keyspace.open(SystemKeyspace.NAME),
@@ -747,7 +778,7 @@ public class StorageProxy implements StorageProxyMBean
      * @param callback an optional callback to be run if and when the write is
      * successful.
      */
-    public static AbstractWriteResponseHandler performWrite(IMutation mutation,
+    public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation,
                                                             ConsistencyLevel consistency_level,
                                                             String localDataCenter,
                                                             WritePerformer performer,
@@ -762,7 +793,7 @@ public class StorageProxy implements StorageProxyMBean
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
+        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
@@ -779,17 +810,17 @@ public class StorageProxy implements StorageProxyMBean
         Token tk = StorageService.getPartitioner().getToken(mutation.key());
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
+        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
         return new WriteResponseHandlerWrapper(responseHandler, mutation);
     }
 
     // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints.
     private static class WriteResponseHandlerWrapper
     {
-        final AbstractWriteResponseHandler handler;
+        final AbstractWriteResponseHandler<IMutation> handler;
         final Mutation mutation;
 
-        WriteResponseHandlerWrapper(AbstractWriteResponseHandler handler, Mutation mutation)
+        WriteResponseHandlerWrapper(AbstractWriteResponseHandler<IMutation> handler, Mutation mutation)
         {
             this.handler = handler;
             this.mutation = mutation;
@@ -841,7 +872,7 @@ public class StorageProxy implements StorageProxyMBean
      */
     public static void sendToHintedEndpoints(final Mutation mutation,
                                              Iterable<InetAddress> targets,
-                                             AbstractWriteResponseHandler responseHandler,
+                                             AbstractWriteResponseHandler<IMutation> responseHandler,
                                              String localDataCenter)
     throws OverloadedException
     {
@@ -933,7 +964,7 @@ public class StorageProxy implements StorageProxyMBean
 
     public static Future<Void> submitHint(final Mutation mutation,
                                           final InetAddress target,
-                                          final AbstractWriteResponseHandler responseHandler)
+                                          final AbstractWriteResponseHandler<IMutation> responseHandler)
     {
         // local write that time out should be handled by LocalMutationRunnable
         assert !target.equals(FBUtilities.getBroadcastAddress()) : target;
@@ -979,7 +1010,9 @@ public class StorageProxy implements StorageProxyMBean
         StorageMetrics.totalHints.inc();
     }
 
-    private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message, Collection<InetAddress> targets, AbstractWriteResponseHandler handler)
+    private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation> message,
+                                                 Collection<InetAddress> targets,
+                                                 AbstractWriteResponseHandler<IMutation> handler)
     {
         Iterator<InetAddress> iter = targets.iterator();
         InetAddress target = iter.next();
@@ -1014,7 +1047,7 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler responseHandler)
+    private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler)
     {
 
         StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable()
@@ -1024,8 +1057,16 @@ public class StorageProxy implements StorageProxyMBean
                 IMutation processed = SinkManager.processWriteRequest(mutation);
                 if (processed != null)
                 {
-                    ((Mutation) processed).apply();
-                    responseHandler.response(null);
+                    try 
+                    {
+                        ((Mutation) processed).apply();
+                        responseHandler.response(null);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.error("Failed to apply mutation locally : {}", ex.getMessage());
+                        responseHandler.onFailure(FBUtilities.getBroadcastAddress());
+                    }
                 }
             }
         });
@@ -1045,7 +1086,7 @@ public class StorageProxy implements StorageProxyMBean
      * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
      * the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
      */
-    public static AbstractWriteResponseHandler mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
+    public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
     {
         InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
 
@@ -1065,7 +1106,7 @@ public class StorageProxy implements StorageProxyMBean
             rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER);
+            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER);
 
             Tracing.trace("Enqueuing counter update to {}", endpoint);
             MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler, false);
@@ -1112,7 +1153,7 @@ public class StorageProxy implements StorageProxyMBean
 
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
-    public static AbstractWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback)
+    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback)
     throws UnavailableException, OverloadedException
     {
         return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER);
@@ -1120,7 +1161,7 @@ public class StorageProxy implements StorageProxyMBean
 
     // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
-    public static AbstractWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
+    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
     throws UnavailableException, OverloadedException
     {
         return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER);
@@ -1128,7 +1169,7 @@ public class StorageProxy implements StorageProxyMBean
 
     private static Runnable counterWriteTask(final IMutation mutation,
                                              final Iterable<InetAddress> targets,
-                                             final AbstractWriteResponseHandler responseHandler,
+                                             final AbstractWriteResponseHandler<IMutation> responseHandler,
                                              final String localDataCenter)
     {
         return new DroppableRunnable(MessagingService.Verb.COUNTER_MUTATION)
@@ -1222,6 +1263,10 @@ public class StorageProxy implements StorageProxyMBean
             {
                 throw new ReadTimeoutException(consistencyLevel, 0, consistencyLevel.blockFor(Keyspace.open(command.ksName)), false);
             }
+            catch (WriteFailureException e)
+            {
+                throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false);
+            }
 
             rows = fetchRows(commands, consistencyForCommitOrFetch);
         }
@@ -2094,7 +2139,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         public void apply(IMutation mutation,
                           Iterable<InetAddress> targets,
-                          AbstractWriteResponseHandler responseHandler,
+                          AbstractWriteResponseHandler<IMutation> responseHandler,
                           String localDataCenter,
                           ConsistencyLevel consistencyLevel) throws OverloadedException;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index df23b19..1dc03e0 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.db.WriteType;
 /**
  * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
  */
-public class WriteResponseHandler extends AbstractWriteResponseHandler
+public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
 {
     protected static final Logger logger = LoggerFactory.getLogger(WriteResponseHandler.class);
 
@@ -63,7 +63,7 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
         this(endpoint, writeType, null);
     }
 
-    public void response(MessageIn m)
+    public void response(MessageIn<T> m)
     {
         if (responsesUpdater.decrementAndGet(this) == 0)
             signal();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 1e9564c..d6d901b 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -76,14 +76,23 @@ public class ErrorMessage extends Message.Response
                 case TRUNCATE_ERROR:
                     te = new TruncateException(msg);
                     break;
+                case WRITE_FAILURE: 
                 case READ_FAILURE:
                     {
                         ConsistencyLevel cl = CBUtil.readConsistencyLevel(body);
                         int received = body.readInt();
                         int blockFor = body.readInt();
                         int failure = body.readInt();
-                        byte dataPresent = body.readByte();
-                        te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0);
+                        if (code == ExceptionCode.WRITE_FAILURE)
+                        {
+                            WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body));
+                            te = new WriteFailureException(cl, received, failure, blockFor, writeType);
+                        }
+                        else
+                        {
+                            byte dataPresent = body.readByte();
+                            te = new ReadFailureException(cl, received, failure, blockFor, dataPresent != 0);   
+                        }
                     }
                     break;
                 case WRITE_TIMEOUT:
@@ -152,15 +161,21 @@ public class ErrorMessage extends Message.Response
                     dest.writeInt(ue.required);
                     dest.writeInt(ue.alive);
                     break;
+                case WRITE_FAILURE:
                 case READ_FAILURE:
                     {
                         RequestFailureException rfe = (RequestFailureException)err;
+                        boolean isWrite = err.code() == ExceptionCode.WRITE_FAILURE;
 
                         CBUtil.writeConsistencyLevel(rfe.consistency, dest);
                         dest.writeInt(rfe.received);
                         dest.writeInt(rfe.blockFor);
                         dest.writeInt(rfe.failures);
-                        dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0));
+
+                        if (isWrite)
+                            CBUtil.writeString(((WriteFailureException)rfe).writeType.toString(), dest);
+                        else
+                            dest.writeByte((byte)(((ReadFailureException)rfe).dataPresent ? 1 : 0));
                     }
                     break;
                 case WRITE_TIMEOUT:
@@ -204,10 +219,13 @@ public class ErrorMessage extends Message.Response
                     UnavailableException ue = (UnavailableException)err;
                     size += CBUtil.sizeOfConsistencyLevel(ue.consistency) + 8;
                     break;
+                case WRITE_FAILURE:
                 case READ_FAILURE:
                     {
-                        ReadFailureException rfe = (ReadFailureException)err;
-                        size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4 + 1;
+                        RequestFailureException rfe = (RequestFailureException)err;
+                        boolean isWrite = err.code() == ExceptionCode.WRITE_FAILURE;
+                        size += CBUtil.sizeOfConsistencyLevel(rfe.consistency) + 4 + 4 + 4;
+                        size += isWrite ? CBUtil.sizeOfString(((WriteFailureException)rfe).writeType.toString()) : 1;
                     }
                     break;
                 case WRITE_TIMEOUT:
@@ -246,6 +264,9 @@ public class ErrorMessage extends Message.Response
                 case READ_FAILURE:
                     ReadFailureException rfe = (ReadFailureException) msg.error;
                     return new ReadTimeoutException(rfe.consistency, rfe.received, rfe.blockFor, rfe.dataPresent);
+                case WRITE_FAILURE:
+                    WriteFailureException wfe = (WriteFailureException) msg.error;
+                    return new WriteTimeoutException(wfe.writeType, wfe.consistency, wfe.received, wfe.blockFor);
                 case FUNCTION_FAILURE:
                     return new InvalidRequestException(msg.toString());
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c059a568/test/unit/org/apache/cassandra/db/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java
index a720608..f8e757a 100644
--- a/test/unit/org/apache/cassandra/db/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java
@@ -311,8 +311,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         assert MessageIn.read(in, getVersion(), -1) != null;
 
         // set up some fake callbacks so deserialization knows that what it's deserializing is a TruncateResponse
-        MessagingService.instance().setCallbackForTests(1, new CallbackInfo(null, null, TruncateResponse.serializer));
-        MessagingService.instance().setCallbackForTests(2, new CallbackInfo(null, null, TruncateResponse.serializer));
+        MessagingService.instance().setCallbackForTests(1, new CallbackInfo(null, null, TruncateResponse.serializer, false));
+        MessagingService.instance().setCallbackForTests(2, new CallbackInfo(null, null, TruncateResponse.serializer, false));
 
         assert MessageIn.read(in, getVersion(), 1) != null;
         assert MessageIn.read(in, getVersion(), 2) != null;


Mime
View raw message