cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject git commit: Improve write timeout exceptions
Date Mon, 08 Oct 2012 08:12:48 GMT
Updated Branches:
  refs/heads/trunk 801d7d3f5 -> ee5aafe6a


Improve write timeout exceptions

patch by slebresne; reviewed by jbellis for CASSANDRA-4723


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee5aafe6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee5aafe6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee5aafe6

Branch: refs/heads/trunk
Commit: ee5aafe6a4cb409f16c48edcb3a1682c7d6b400e
Parents: 801d7d3
Author: Sylvain Lebresne <sylvain@datastax.com>
Authored: Tue Sep 25 18:23:05 2012 +0200
Committer: Sylvain Lebresne <sylvain@datastax.com>
Committed: Mon Oct 8 10:11:23 2012 +0200

----------------------------------------------------------------------
 doc/native_protocol.spec                           |   18 ++++-
 .../org/apache/cassandra/db/CounterColumn.java     |    2 +-
 .../apache/cassandra/db/HintedHandOffManager.java  |    2 +-
 src/java/org/apache/cassandra/db/WriteType.java    |   27 ++++++
 .../exceptions/WriteTimeoutException.java          |    7 +-
 .../locator/AbstractReplicationStrategy.java       |    9 +-
 .../service/AbstractWriteResponseHandler.java      |   14 +++-
 .../DatacenterSyncWriteResponseHandler.java        |   15 ++--
 .../service/DatacenterWriteResponseHandler.java    |   15 ++--
 .../org/apache/cassandra/service/StorageProxy.java |   64 +++++---------
 .../cassandra/service/WriteResponseHandler.java    |   24 +++---
 .../apache/cassandra/thrift/ThriftConversion.java  |    9 ++-
 .../cassandra/transport/messages/ErrorMessage.java |   42 ++++++----
 13 files changed, 148 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 71a7c71..a7fa7cb 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -524,7 +524,7 @@ Table of Contents
     0x1003    Truncate_error: error during a truncation error.
     0x1100    Write_timeout: Timeout exception during a write request. The rest
               of the ERROR message body will be
-                <cl><received><blockfor>
+                <cl><received><blockfor><writeType>
               where:
                 <cl> is a [string] representing the consistency level of the
                      query having triggered the exception.
@@ -532,6 +532,22 @@ Table of Contents
                            acknowledged the request.
                 <blockfor> is the number of replica whose acknowledgement is
                            required to achieve <cl>.
+                <writeType> is a [string] that describe the type of the write
+                            that timeouted. The value of that string can be one
+                            of:
+                             - "SIMPLE": the write was a non-batched
+                               non-counter write.
+                             - "BATCH": the write was a (logged) batch write.
+                               If this type is received, it means the batch log
+                               has been successfully written (otherwise a
+                               "BATCH_LOG" type would have been send instead).
+                             - "UNLOGGED_BATCH": the write was an unlogged
+                               batch. Not batch log write has been attempted.
+                             - "COUNTER": the write was a counter write
+                               (batched or not).
+                             - "BATCH_LOG": the timeout occured during the
+                               write to the batch log when a (logged) batch
+                               write was requested.
     0x1200    Read_timeout: Timeout exception during a read request. The rest
               of the ERROR message body will be
                 <cl><received><blockfor><data_present>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/db/CounterColumn.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java
index 31e9c36..1c5fdcd 100644
--- a/src/java/org/apache/cassandra/db/CounterColumn.java
+++ b/src/java/org/apache/cassandra/db/CounterColumn.java
@@ -377,7 +377,7 @@ public class CounterColumn extends Column
                 responseHandler.response(null);
                 StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler,
localDataCenter, consistency_level);
             }
-        }, null);
+        }, null, WriteType.SIMPLE);
 
         // we don't wait for answers
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index e5ff163..119dd16 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -126,7 +126,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     private static void sendMutation(InetAddress endpoint, MessageOut<?> message) throws
WriteTimeoutException
     {
-        AbstractWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
+        AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint,
WriteType.UNLOGGED_BATCH);
         MessagingService.instance().sendRR(message, endpoint, responseHandler);
         responseHandler.get();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/db/WriteType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java
new file mode 100644
index 0000000..b96585d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/WriteType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+public enum WriteType
+{
+    SIMPLE,
+    BATCH,
+    UNLOGGED_BATCH,
+    COUNTER,
+    BATCH_LOG;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
index c6066f6..2802fe9 100644
--- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
+++ b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java
@@ -22,14 +22,15 @@ import java.util.Set;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
 
 public class WriteTimeoutException extends RequestTimeoutException
 {
-    public final boolean writtenToBatchlog;
+    public final WriteType writeType;
 
-    public WriteTimeoutException(ConsistencyLevel consistency, int received, int blockFor,
boolean writtenToBatchlog)
+    public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, int received,
int blockFor)
     {
         super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor);
-        this.writtenToBatchlog = writtenToBatchlog;
+        this.writeType = writeType;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 630aac4..c6adad3 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.RingPosition;
 import org.apache.cassandra.dht.Token;
@@ -114,18 +115,18 @@ public abstract class AbstractReplicationStrategy
      */
     public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken,
TokenMetadata tokenMetadata);
 
-    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress>
naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level,
Runnable callback)
+    public AbstractWriteResponseHandler getWriteResponseHandler(Collection<InetAddress>
naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level,
Runnable callback, WriteType writeType)
     {
         if (consistency_level == ConsistencyLevel.LOCAL_QUORUM)
         {
             // block for in this context will be localnodes block.
-            return DatacenterWriteResponseHandler.create(naturalEndpoints, pendingEndpoints,
consistency_level, table, callback);
+            return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints,
consistency_level, table, callback, writeType);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM)
         {
-            return DatacenterSyncWriteResponseHandler.create(naturalEndpoints, pendingEndpoints,
consistency_level, table, callback);
+            return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints,
consistency_level, table, callback, writeType);
         }
-        return WriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level,
table, callback);
+        return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level,
table, callback, writeType);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 76eeb0d..2ad2849 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -22,11 +22,11 @@ import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.utils.SimpleCondition;
 
 public abstract class AbstractWriteResponseHandler implements IAsyncCallback
@@ -37,18 +37,24 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
     protected final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
     protected final Collection<InetAddress> pendingEndpoints;
+    private final WriteType writeType;
 
     /**
      * @param pendingEndpoints
      * @param callback A callback to be called when the write is successful.
      */
-    protected AbstractWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, Runnable
callback)
+    protected AbstractWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+                                           Collection<InetAddress> pendingEndpoints,
+                                           ConsistencyLevel consistencyLevel,
+                                           Runnable callback,
+                                           WriteType writeType)
     {
         this.pendingEndpoints = pendingEndpoints;
         startTime = System.currentTimeMillis();
         this.consistencyLevel = consistencyLevel;
         this.naturalEndpoints = naturalEndpoints;
         this.callback = callback;
+        this.writeType = writeType;
     }
 
     public void get() throws WriteTimeoutException
@@ -66,7 +72,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback
         }
 
         if (!success)
-            throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor(), false);
+            throw new WriteTimeoutException(writeType, consistencyLevel, ackCount(), blockFor());
     }
 
     protected abstract int ackCount();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index ffee975..995b5bf 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -52,10 +53,15 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
     private final NetworkTopologyStrategy strategy;
     private final HashMap<String, AtomicInteger> responses = new HashMap<String,
AtomicInteger>();
 
-    protected DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
+    public DatacenterSyncWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+                                              Collection<InetAddress> pendingEndpoints,
+                                              ConsistencyLevel consistencyLevel,
+                                              String table,
+                                              Runnable callback,
+                                              WriteType writeType)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback);
+        super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
         assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
 
         this.table = table;
@@ -68,11 +74,6 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan
         }
     }
 
-    public static AbstractWriteResponseHandler create(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
-    {
-        return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints,
consistencyLevel, table, callback);
-    }
-
     public void response(MessageIn message)
     {
         String dataCenter = message == null

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index 14e2b5f..cc76231 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -43,17 +44,17 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler
         localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
     }
 
-    protected DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
+    public DatacenterWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
+                                          Collection<InetAddress> pendingEndpoints,
+                                          ConsistencyLevel consistencyLevel,
+                                          String table,
+                                          Runnable callback,
+                                          WriteType writeType)
     {
-        super(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback);
+        super(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback, writeType);
         assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM;
     }
 
-    public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
-    {
-        return new DatacenterWriteResponseHandler(writeEndpoints, pendingEndpoints, consistencyLevel,
table, callback);
-    }
-
     @Override
     public void response(MessageIn message)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index df6a36a..37e28fb 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -188,7 +188,8 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 else
                 {
-                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter,
standardWritePerformer, null));
+                    WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH;
+                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter,
standardWritePerformer, null, wt));
                 }
             }
 
@@ -244,7 +245,7 @@ public class StorageProxy implements StorageProxyMBean
      * @param consistency_level the consistency level for the operation
      */
     public static void mutateAtomically(Collection<RowMutation> mutations, ConsistencyLevel
consistency_level)
-    throws UnavailableException, WriteTimeoutException
+    throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         long startTime = System.nanoTime();
 
@@ -259,7 +260,7 @@ public class StorageProxy implements StorageProxyMBean
             // add a handler for each mutation - includes checking availability, but doesn't
initiate any writes, yet
             for (RowMutation mutation : mutations)
             {
-                WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level);
+                WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level,
WriteType.BATCH);
                 // exit early if we can't fulfill the CL at this time.
                 wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
@@ -298,11 +299,12 @@ public class StorageProxy implements StorageProxyMBean
     throws WriteTimeoutException
     {
         RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid);
-        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints,
-                                                                           Collections.<InetAddress>emptyList(),
-                                                                           ConsistencyLevel.ONE,
-                                                                           Table.SYSTEM_KS,
-                                                                           null);
+        AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints,
+                                                                        Collections.<InetAddress>emptyList(),
+                                                                        ConsistencyLevel.ONE,
+                                                                        Table.SYSTEM_KS,
+                                                                        null,
+                                                                        WriteType.BATCH_LOG);
 
         try
         {
@@ -313,21 +315,14 @@ public class StorageProxy implements StorageProxyMBean
             throw new RuntimeException("Error writing to batchlog", e);
         }
 
-        try
-        {
-            handler.get();
-        }
-        catch (WriteTimeoutException e)
-        {
-            throw new WriteTimeoutException(e.consistency, 0, e.blockFor, false);
-        }
+        handler.get();
     }
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints,
UUID uuid)
     {
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
         rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros());
-        AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, Collections.<InetAddress>emptyList(),
ConsistencyLevel.ANY, Table.SYSTEM_KS, null);
+        AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.<InetAddress>emptyList(),
ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE);
 
         try
         {
@@ -342,7 +337,7 @@ public class StorageProxy implements StorageProxyMBean
     private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper>
wrappers,
                                                   String localDataCenter,
                                                   ConsistencyLevel consistencyLevel)
-    throws WriteTimeoutException
+    throws WriteTimeoutException, OverloadedException
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
@@ -355,25 +350,11 @@ public class StorageProxy implements StorageProxyMBean
             {
                 throw new RuntimeException("Error writing key " + ByteBufferUtil.bytesToHex(wrapper.mutation.key()),
e);
             }
-            catch (OverloadedException e)
-            {
-                // turn OE into TOE.
-                throw new WriteTimeoutException(consistencyLevel, -1, 0, true);
-            }
         }
 
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            try
-            {
-                wrapper.handler.get();
-            }
-            catch (WriteTimeoutException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("Write timeout {} for {}", e, wrapper.mutation.toString(true));
-                throw new WriteTimeoutException(e.consistency, -1, e.blockFor, true);
-            }
+            wrapper.handler.get();
         }
     }
 
@@ -395,7 +376,8 @@ public class StorageProxy implements StorageProxyMBean
                                                      ConsistencyLevel consistency_level,
                                                      String localDataCenter,
                                                      WritePerformer performer,
-                                                     Runnable callback)
+                                                     Runnable callback,
+                                                     WriteType writeType)
     throws UnavailableException, OverloadedException, IOException
     {
         String table = mutation.getTable();
@@ -405,7 +387,7 @@ public class StorageProxy implements StorageProxyMBean
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
 
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, callback);
+        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, callback, writeType);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
@@ -415,14 +397,14 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     // same as above except does not initiate writes (but does perfrom availability checks).
-    private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation,
ConsistencyLevel consistency_level)
+    private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation,
ConsistencyLevel consistency_level, WriteType writeType)
     {
         AbstractReplicationStrategy rs = Table.open(mutation.getTable()).getReplicationStrategy();
         String table = mutation.getTable();
         Token tk = StorageService.getPartitioner().getToken(mutation.key());
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
-        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, null);
+        AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints,
pendingEndpoints, consistency_level, null, writeType);
         return new WriteResponseHandlerWrapper(responseHandler, mutation);
     }
 
@@ -710,10 +692,10 @@ public class StorageProxy implements StorageProxyMBean
             List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(table,
tk);
             Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
table);
 
-            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(),
null).assureSufficientLiveNodes();
+            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(),
null, WriteType.COUNTER).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint);
+            AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint,
WriteType.COUNTER);
 
             if (logger.isDebugEnabled())
                 logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key())
+ " to " + endpoint);
@@ -763,7 +745,7 @@ public class StorageProxy implements StorageProxyMBean
     public static AbstractWriteResponseHandler applyCounterMutationOnLeader(CounterMutation
cm, String localDataCenter, Runnable callback)
     throws UnavailableException, IOException, OverloadedException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer,
callback);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer,
callback, WriteType.COUNTER);
     }
 
     // Same as applyCounterMutationOnLeader but must with the difference that it use the
MUTATION stage to execute the write (while
@@ -771,7 +753,7 @@ public class StorageProxy implements StorageProxyMBean
     public static AbstractWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation
cm, String localDataCenter)
     throws UnavailableException, IOException, OverloadedException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer,
null);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer,
null, WriteType.COUNTER);
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index e61a25f..707a583 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
 
 /**
  * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels.
@@ -42,30 +43,25 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler
     protected final AtomicInteger responses;
     private final int blockFor;
 
-    protected WriteResponseHandler(Collection<InetAddress> writeEndpoints, Collection<InetAddress>
pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback)
+    public WriteResponseHandler(Collection<InetAddress> writeEndpoints,
+                                Collection<InetAddress> pendingEndpoints,
+                                ConsistencyLevel consistencyLevel,
+                                String table,
+                                Runnable callback,
+                                WriteType writeType)
     {
-        super(writeEndpoints, pendingEndpoints, consistencyLevel, callback);
+        super(writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
         blockFor = consistencyLevel.blockFor(table);
         responses = new AtomicInteger(blockFor);
     }
 
-    protected WriteResponseHandler(InetAddress endpoint)
+    public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
     {
-        super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL,
null);
+        super(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ALL,
null, writeType);
         blockFor = 1;
         responses = new AtomicInteger(1);
     }
 
-    public static AbstractWriteResponseHandler create(Collection<InetAddress> writeEndpoints,
Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, String
table, Runnable callback)
-    {
-        return new WriteResponseHandler(writeEndpoints, pendingEndpoints, consistencyLevel,
table, callback);
-    }
-
-    public static AbstractWriteResponseHandler create(InetAddress endpoint)
-    {
-        return new WriteResponseHandler(endpoint);
-    }
-
     public void response(MessageIn m)
     {
         if (responses.decrementAndGet() == 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/thrift/ThriftConversion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
index aa7d236..3105acd 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.thrift;
 
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.exceptions.RequestTimeoutException;
 import org.apache.cassandra.exceptions.RequestValidationException;
@@ -87,8 +88,12 @@ public class ThriftConversion
         TimedOutException toe = new TimedOutException();
         if (e instanceof WriteTimeoutException)
         {
-            toe.setAcknowledged_by(((WriteTimeoutException)e).received);
-            toe.setAcknowledged_by_batchlog(((WriteTimeoutException)e).writtenToBatchlog);
+            WriteTimeoutException wte = (WriteTimeoutException)e;
+            toe.setAcknowledged_by(wte.received);
+            if (wte.writeType == WriteType.BATCH_LOG)
+                toe.setAcknowledged_by_batchlog(false);
+            else if (wte.writeType == WriteType.BATCH)
+                toe.setAcknowledged_by_batchlog(true);
         }
         return toe;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index b7b0bee..29fc5a6 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.transport.CBUtil;
 import org.apache.cassandra.transport.Message;
@@ -76,18 +77,17 @@ public class ErrorMessage extends Message.Response
                     te = new TruncateException(msg);
                     break;
                 case WRITE_TIMEOUT:
+                case READ_TIMEOUT:
+                    ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body));
+                    int received = body.readInt();
+                    int blockFor = body.readInt();
+                    if (code == ExceptionCode.WRITE_TIMEOUT)
                     {
-                        ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body));
-                        int received = body.readInt();
-                        int blockFor = body.readInt();
-                        te = new WriteTimeoutException(cl, received, blockFor, false);
+                        WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body));
+                        te = new WriteTimeoutException(writeType, cl, received, blockFor);
                     }
-                    break;
-                case READ_TIMEOUT:
+                    else
                     {
-                        ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body));
-                        int received = body.readInt();
-                        int blockFor = body.readInt();
                         byte dataPresent = body.readByte();
                         te = new ReadTimeoutException(cl, received, blockFor, dataPresent
!= 0);
                     }
@@ -140,17 +140,29 @@ public class ErrorMessage extends Message.Response
                 case WRITE_TIMEOUT:
                 case READ_TIMEOUT:
                     RequestTimeoutException rte = (RequestTimeoutException)msg.error;
-                    ReadTimeoutException readEx = rte instanceof ReadTimeoutException
-                                                ? (ReadTimeoutException)rte
-                                                : null;
+                    boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT;
+
                     ByteBuffer rteCl = ByteBufferUtil.bytes(rte.consistency.toString());
-                    acb = ChannelBuffers.buffer(2 + rteCl.remaining() + 8 + (readEx == null
? 0 : 1));
+                    ByteBuffer writeType = isWrite
+                                         ? ByteBufferUtil.bytes(((WriteTimeoutException)rte).writeType.toString())
+                                         : null;
+
+                    int extraSize = isWrite  ? 2 + writeType.remaining() : 1;
+                    acb = ChannelBuffers.buffer(2 + rteCl.remaining() + 8 + extraSize);
+
                     acb.writeShort((short)rteCl.remaining());
                     acb.writeBytes(rteCl);
                     acb.writeInt(rte.received);
                     acb.writeInt(rte.blockFor);
-                    if (readEx != null)
-                        acb.writeByte((byte)(readEx.dataPresent ? 1 : 0));
+                    if (isWrite)
+                    {
+                        acb.writeShort((short)writeType.remaining());
+                        acb.writeBytes(writeType);
+                    }
+                    else
+                    {
+                        acb.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1
: 0));
+                    }
                     break;
                 case UNPREPARED:
                     PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;


Mime
View raw message