Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A2CF2D414 for ; Mon, 8 Oct 2012 08:12:56 +0000 (UTC) Received: (qmail 4873 invoked by uid 500); 8 Oct 2012 08:12:56 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 4561 invoked by uid 500); 8 Oct 2012 08:12:50 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 4519 invoked by uid 99); 8 Oct 2012 08:12:49 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Oct 2012 08:12:49 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D8F133C237; Mon, 8 Oct 2012 08:12:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: Improve write timeout exceptions Message-Id: <20121008081248.D8F133C237@tyr.zones.apache.org> Date: Mon, 8 Oct 2012 08:12:48 +0000 (UTC) Updated Branches: refs/heads/trunk 801d7d3f5 -> ee5aafe6a Improve write timeout exceptions patch by slebresne; reviewed by jbellis for CASSANDRA-4723 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ee5aafe6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ee5aafe6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ee5aafe6 Branch: refs/heads/trunk Commit: ee5aafe6a4cb409f16c48edcb3a1682c7d6b400e Parents: 801d7d3 Author: Sylvain Lebresne Authored: Tue Sep 25 18:23:05 2012 +0200 Committer: Sylvain Lebresne Committed: Mon Oct 8 10:11:23 2012 +0200 ---------------------------------------------------------------------- doc/native_protocol.spec | 18 ++++- .../org/apache/cassandra/db/CounterColumn.java | 2 +- .../apache/cassandra/db/HintedHandOffManager.java | 2 +- src/java/org/apache/cassandra/db/WriteType.java | 27 ++++++ .../exceptions/WriteTimeoutException.java | 7 +- .../locator/AbstractReplicationStrategy.java | 9 +- .../service/AbstractWriteResponseHandler.java | 14 +++- .../DatacenterSyncWriteResponseHandler.java | 15 ++-- .../service/DatacenterWriteResponseHandler.java | 15 ++-- .../org/apache/cassandra/service/StorageProxy.java | 64 +++++--------- .../cassandra/service/WriteResponseHandler.java | 24 +++--- .../apache/cassandra/thrift/ThriftConversion.java | 9 ++- .../cassandra/transport/messages/ErrorMessage.java | 42 ++++++---- 13 files changed, 148 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/doc/native_protocol.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec index 71a7c71..a7fa7cb 100644 --- a/doc/native_protocol.spec +++ b/doc/native_protocol.spec @@ -524,7 +524,7 @@ Table of Contents 0x1003 Truncate_error: error during a truncation error. 0x1100 Write_timeout: Timeout exception during a write request. The rest of the ERROR message body will be - + where: is a [string] representing the consistency level of the query having triggered the exception. @@ -532,6 +532,22 @@ Table of Contents acknowledged the request. is the number of replica whose acknowledgement is required to achieve . + is a [string] that describe the type of the write + that timeouted. The value of that string can be one + of: + - "SIMPLE": the write was a non-batched + non-counter write. + - "BATCH": the write was a (logged) batch write. + If this type is received, it means the batch log + has been successfully written (otherwise a + "BATCH_LOG" type would have been send instead). + - "UNLOGGED_BATCH": the write was an unlogged + batch. Not batch log write has been attempted. + - "COUNTER": the write was a counter write + (batched or not). + - "BATCH_LOG": the timeout occured during the + write to the batch log when a (logged) batch + write was requested. 0x1200 Read_timeout: Timeout exception during a read request. The rest of the ERROR message body will be http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/db/CounterColumn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CounterColumn.java b/src/java/org/apache/cassandra/db/CounterColumn.java index 31e9c36..1c5fdcd 100644 --- a/src/java/org/apache/cassandra/db/CounterColumn.java +++ b/src/java/org/apache/cassandra/db/CounterColumn.java @@ -377,7 +377,7 @@ public class CounterColumn extends Column responseHandler.response(null); StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter, consistency_level); } - }, null); + }, null, WriteType.SIMPLE); // we don't wait for answers } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index e5ff163..119dd16 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -126,7 +126,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private static void sendMutation(InetAddress endpoint, MessageOut message) throws WriteTimeoutException { - AbstractWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint); + AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.UNLOGGED_BATCH); MessagingService.instance().sendRR(message, endpoint, responseHandler); responseHandler.get(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/db/WriteType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/WriteType.java b/src/java/org/apache/cassandra/db/WriteType.java new file mode 100644 index 0000000..b96585d --- /dev/null +++ b/src/java/org/apache/cassandra/db/WriteType.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +public enum WriteType +{ + SIMPLE, + BATCH, + UNLOGGED_BATCH, + COUNTER, + BATCH_LOG; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java index c6066f6..2802fe9 100644 --- a/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java +++ b/src/java/org/apache/cassandra/exceptions/WriteTimeoutException.java @@ -22,14 +22,15 @@ import java.util.Set; import java.nio.ByteBuffer; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; public class WriteTimeoutException extends RequestTimeoutException { - public final boolean writtenToBatchlog; + public final WriteType writeType; - public WriteTimeoutException(ConsistencyLevel consistency, int received, int blockFor, boolean writtenToBatchlog) + public WriteTimeoutException(WriteType writeType, ConsistencyLevel consistency, int received, int blockFor) { super(ExceptionCode.WRITE_TIMEOUT, consistency, received, blockFor); - this.writtenToBatchlog = writtenToBatchlog; + this.writeType = writeType; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 630aac4..c6adad3 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.RingPosition; import org.apache.cassandra.dht.Token; @@ -114,18 +115,18 @@ public abstract class AbstractReplicationStrategy */ public abstract List calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); - public AbstractWriteResponseHandler getWriteResponseHandler(Collection naturalEndpoints, Collection pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback) + public AbstractWriteResponseHandler getWriteResponseHandler(Collection naturalEndpoints, Collection pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback, WriteType writeType) { if (consistency_level == ConsistencyLevel.LOCAL_QUORUM) { // block for in this context will be localnodes block. - return DatacenterWriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level, table, callback); + return new DatacenterWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType); } else if (consistency_level == ConsistencyLevel.EACH_QUORUM) { - return DatacenterSyncWriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level, table, callback); + return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType); } - return WriteResponseHandler.create(naturalEndpoints, pendingEndpoints, consistency_level, table, callback); + return new WriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, table, callback, writeType); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 76eeb0d..2ad2849 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -22,11 +22,11 @@ import java.util.Collection; import java.util.concurrent.TimeUnit; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.exceptions.UnavailableException; -import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.exceptions.*; import org.apache.cassandra.net.IAsyncCallback; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.utils.SimpleCondition; public abstract class AbstractWriteResponseHandler implements IAsyncCallback @@ -37,18 +37,24 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback protected final ConsistencyLevel consistencyLevel; protected final Runnable callback; protected final Collection pendingEndpoints; + private final WriteType writeType; /** * @param pendingEndpoints * @param callback A callback to be called when the write is successful. */ - protected AbstractWriteResponseHandler(Collection naturalEndpoints, Collection pendingEndpoints, ConsistencyLevel consistencyLevel, Runnable callback) + protected AbstractWriteResponseHandler(Collection naturalEndpoints, + Collection pendingEndpoints, + ConsistencyLevel consistencyLevel, + Runnable callback, + WriteType writeType) { this.pendingEndpoints = pendingEndpoints; startTime = System.currentTimeMillis(); this.consistencyLevel = consistencyLevel; this.naturalEndpoints = naturalEndpoints; this.callback = callback; + this.writeType = writeType; } public void get() throws WriteTimeoutException @@ -66,7 +72,7 @@ public abstract class AbstractWriteResponseHandler implements IAsyncCallback } if (!success) - throw new WriteTimeoutException(consistencyLevel, ackCount(), blockFor(), false); + throw new WriteTimeoutException(writeType, consistencyLevel, ackCount(), blockFor()); } protected abstract int ackCount(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java index ffee975..995b5bf 100644 --- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java @@ -33,6 +33,7 @@ import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.NetworkTopologyStrategy; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.utils.FBUtilities; /** @@ -52,10 +53,15 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan private final NetworkTopologyStrategy strategy; private final HashMap responses = new HashMap(); - protected DatacenterSyncWriteResponseHandler(Collection naturalEndpoints, Collection pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) + public DatacenterSyncWriteResponseHandler(Collection naturalEndpoints, + Collection pendingEndpoints, + ConsistencyLevel consistencyLevel, + String table, + Runnable callback, + WriteType writeType) { // Response is been managed by the map so make it 1 for the superclass. - super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback); + super(naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType); assert consistencyLevel == ConsistencyLevel.EACH_QUORUM; this.table = table; @@ -68,11 +74,6 @@ public class DatacenterSyncWriteResponseHandler extends AbstractWriteResponseHan } } - public static AbstractWriteResponseHandler create(Collection naturalEndpoints, Collection pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) - { - return new DatacenterSyncWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback); - } - public void response(MessageIn message) { String dataCenter = message == null http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java index 14e2b5f..cc76231 100644 --- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java @@ -28,6 +28,7 @@ import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.utils.FBUtilities; /** @@ -43,17 +44,17 @@ public class DatacenterWriteResponseHandler extends WriteResponseHandler localdc = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); } - protected DatacenterWriteResponseHandler(Collection naturalEndpoints, Collection pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) + public DatacenterWriteResponseHandler(Collection naturalEndpoints, + Collection pendingEndpoints, + ConsistencyLevel consistencyLevel, + String table, + Runnable callback, + WriteType writeType) { - super(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback); + super(naturalEndpoints, pendingEndpoints, consistencyLevel, table, callback, writeType); assert consistencyLevel == ConsistencyLevel.LOCAL_QUORUM; } - public static AbstractWriteResponseHandler create(Collection writeEndpoints, Collection pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) - { - return new DatacenterWriteResponseHandler(writeEndpoints, pendingEndpoints, consistencyLevel, table, callback); - } - @Override public void response(MessageIn message) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index df6a36a..37e28fb 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -188,7 +188,8 @@ public class StorageProxy implements StorageProxyMBean } else { - responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null)); + WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH; + responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt)); } } @@ -244,7 +245,7 @@ public class StorageProxy implements StorageProxyMBean * @param consistency_level the consistency level for the operation */ public static void mutateAtomically(Collection mutations, ConsistencyLevel consistency_level) - throws UnavailableException, WriteTimeoutException + throws UnavailableException, OverloadedException, WriteTimeoutException { long startTime = System.nanoTime(); @@ -259,7 +260,7 @@ public class StorageProxy implements StorageProxyMBean // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (RowMutation mutation : mutations) { - WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level); + WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level, WriteType.BATCH); // exit early if we can't fulfill the CL at this time. wrapper.handler.assureSufficientLiveNodes(); wrappers.add(wrapper); @@ -298,11 +299,12 @@ public class StorageProxy implements StorageProxyMBean throws WriteTimeoutException { RowMutation rm = BatchlogManager.getBatchlogMutationFor(mutations, uuid); - AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, - Collections.emptyList(), - ConsistencyLevel.ONE, - Table.SYSTEM_KS, - null); + AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, + Collections.emptyList(), + ConsistencyLevel.ONE, + Table.SYSTEM_KS, + null, + WriteType.BATCH_LOG); try { @@ -313,21 +315,14 @@ public class StorageProxy implements StorageProxyMBean throw new RuntimeException("Error writing to batchlog", e); } - try - { - handler.get(); - } - catch (WriteTimeoutException e) - { - throw new WriteTimeoutException(e.consistency, 0, e.blockFor, false); - } + handler.get(); } private static void asyncRemoveFromBatchlog(Collection endpoints, UUID uuid) { RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid)); rm.delete(new QueryPath(SystemTable.BATCHLOG_CF), FBUtilities.timestampMicros()); - AbstractWriteResponseHandler handler = WriteResponseHandler.create(endpoints, Collections.emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null); + AbstractWriteResponseHandler handler = new WriteResponseHandler(endpoints, Collections.emptyList(), ConsistencyLevel.ANY, Table.SYSTEM_KS, null, WriteType.SIMPLE); try { @@ -342,7 +337,7 @@ public class StorageProxy implements StorageProxyMBean private static void syncWriteBatchedMutations(List wrappers, String localDataCenter, ConsistencyLevel consistencyLevel) - throws WriteTimeoutException + throws WriteTimeoutException, OverloadedException { for (WriteResponseHandlerWrapper wrapper : wrappers) { @@ -355,25 +350,11 @@ public class StorageProxy implements StorageProxyMBean { throw new RuntimeException("Error writing key " + ByteBufferUtil.bytesToHex(wrapper.mutation.key()), e); } - catch (OverloadedException e) - { - // turn OE into TOE. - throw new WriteTimeoutException(consistencyLevel, -1, 0, true); - } } for (WriteResponseHandlerWrapper wrapper : wrappers) { - try - { - wrapper.handler.get(); - } - catch (WriteTimeoutException e) - { - if (logger.isDebugEnabled()) - logger.debug("Write timeout {} for {}", e, wrapper.mutation.toString(true)); - throw new WriteTimeoutException(e.consistency, -1, e.blockFor, true); - } + wrapper.handler.get(); } } @@ -395,7 +376,8 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistency_level, String localDataCenter, WritePerformer performer, - Runnable callback) + Runnable callback, + WriteType writeType) throws UnavailableException, OverloadedException, IOException { String table = mutation.getTable(); @@ -405,7 +387,7 @@ public class StorageProxy implements StorageProxyMBean List naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk); Collection pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table); - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback); + AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType); // exit early if we can't fulfill the CL at this time responseHandler.assureSufficientLiveNodes(); @@ -415,14 +397,14 @@ public class StorageProxy implements StorageProxyMBean } // same as above except does not initiate writes (but does perfrom availability checks). - private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation, ConsistencyLevel consistency_level) + private static WriteResponseHandlerWrapper wrapResponseHandler(RowMutation mutation, ConsistencyLevel consistency_level, WriteType writeType) { AbstractReplicationStrategy rs = Table.open(mutation.getTable()).getReplicationStrategy(); String table = mutation.getTable(); Token tk = StorageService.getPartitioner().getToken(mutation.key()); List naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk); Collection pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table); - AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null); + AbstractWriteResponseHandler responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); return new WriteResponseHandlerWrapper(responseHandler, mutation); } @@ -710,10 +692,10 @@ public class StorageProxy implements StorageProxyMBean List naturalEndpoints = StorageService.instance.getNaturalEndpoints(table, tk); Collection pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, table); - rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null).assureSufficientLiveNodes(); + rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes(); // Forward the actual update to the chosen leader replica - AbstractWriteResponseHandler responseHandler = WriteResponseHandler.create(endpoint); + AbstractWriteResponseHandler responseHandler = new WriteResponseHandler(endpoint, WriteType.COUNTER); if (logger.isDebugEnabled()) logger.debug("forwarding counter update of key " + ByteBufferUtil.bytesToHex(cm.key()) + " to " + endpoint); @@ -763,7 +745,7 @@ public class StorageProxy implements StorageProxyMBean public static AbstractWriteResponseHandler applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback) throws UnavailableException, IOException, OverloadedException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback); + return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER); } // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while @@ -771,7 +753,7 @@ public class StorageProxy implements StorageProxyMBean public static AbstractWriteResponseHandler applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter) throws UnavailableException, IOException, OverloadedException { - return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null); + return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER); } private static Runnable counterWriteTask(final IMutation mutation, http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/service/WriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index e61a25f..707a583 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -31,6 +31,7 @@ import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; /** * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. @@ -42,30 +43,25 @@ public class WriteResponseHandler extends AbstractWriteResponseHandler protected final AtomicInteger responses; private final int blockFor; - protected WriteResponseHandler(Collection writeEndpoints, Collection pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) + public WriteResponseHandler(Collection writeEndpoints, + Collection pendingEndpoints, + ConsistencyLevel consistencyLevel, + String table, + Runnable callback, + WriteType writeType) { - super(writeEndpoints, pendingEndpoints, consistencyLevel, callback); + super(writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType); blockFor = consistencyLevel.blockFor(table); responses = new AtomicInteger(blockFor); } - protected WriteResponseHandler(InetAddress endpoint) + public WriteResponseHandler(InetAddress endpoint, WriteType writeType) { - super(Arrays.asList(endpoint), Collections.emptyList(), ConsistencyLevel.ALL, null); + super(Arrays.asList(endpoint), Collections.emptyList(), ConsistencyLevel.ALL, null, writeType); blockFor = 1; responses = new AtomicInteger(1); } - public static AbstractWriteResponseHandler create(Collection writeEndpoints, Collection pendingEndpoints, ConsistencyLevel consistencyLevel, String table, Runnable callback) - { - return new WriteResponseHandler(writeEndpoints, pendingEndpoints, consistencyLevel, table, callback); - } - - public static AbstractWriteResponseHandler create(InetAddress endpoint) - { - return new WriteResponseHandler(endpoint); - } - public void response(MessageIn m) { if (responses.decrementAndGet() == 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/thrift/ThriftConversion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftConversion.java b/src/java/org/apache/cassandra/thrift/ThriftConversion.java index aa7d236..3105acd 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftConversion.java +++ b/src/java/org/apache/cassandra/thrift/ThriftConversion.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.thrift; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestTimeoutException; import org.apache.cassandra.exceptions.RequestValidationException; @@ -87,8 +88,12 @@ public class ThriftConversion TimedOutException toe = new TimedOutException(); if (e instanceof WriteTimeoutException) { - toe.setAcknowledged_by(((WriteTimeoutException)e).received); - toe.setAcknowledged_by_batchlog(((WriteTimeoutException)e).writtenToBatchlog); + WriteTimeoutException wte = (WriteTimeoutException)e; + toe.setAcknowledged_by(wte.received); + if (wte.writeType == WriteType.BATCH_LOG) + toe.setAcknowledged_by_batchlog(false); + else if (wte.writeType == WriteType.BATCH) + toe.setAcknowledged_by_batchlog(true); } return toe; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ee5aafe6/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index b7b0bee..29fc5a6 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.transport.CBUtil; import org.apache.cassandra.transport.Message; @@ -76,18 +77,17 @@ public class ErrorMessage extends Message.Response te = new TruncateException(msg); break; case WRITE_TIMEOUT: + case READ_TIMEOUT: + ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body)); + int received = body.readInt(); + int blockFor = body.readInt(); + if (code == ExceptionCode.WRITE_TIMEOUT) { - ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body)); - int received = body.readInt(); - int blockFor = body.readInt(); - te = new WriteTimeoutException(cl, received, blockFor, false); + WriteType writeType = Enum.valueOf(WriteType.class, CBUtil.readString(body)); + te = new WriteTimeoutException(writeType, cl, received, blockFor); } - break; - case READ_TIMEOUT: + else { - ConsistencyLevel cl = Enum.valueOf(ConsistencyLevel.class, CBUtil.readString(body)); - int received = body.readInt(); - int blockFor = body.readInt(); byte dataPresent = body.readByte(); te = new ReadTimeoutException(cl, received, blockFor, dataPresent != 0); } @@ -140,17 +140,29 @@ public class ErrorMessage extends Message.Response case WRITE_TIMEOUT: case READ_TIMEOUT: RequestTimeoutException rte = (RequestTimeoutException)msg.error; - ReadTimeoutException readEx = rte instanceof ReadTimeoutException - ? (ReadTimeoutException)rte - : null; + boolean isWrite = msg.error.code() == ExceptionCode.WRITE_TIMEOUT; + ByteBuffer rteCl = ByteBufferUtil.bytes(rte.consistency.toString()); - acb = ChannelBuffers.buffer(2 + rteCl.remaining() + 8 + (readEx == null ? 0 : 1)); + ByteBuffer writeType = isWrite + ? ByteBufferUtil.bytes(((WriteTimeoutException)rte).writeType.toString()) + : null; + + int extraSize = isWrite ? 2 + writeType.remaining() : 1; + acb = ChannelBuffers.buffer(2 + rteCl.remaining() + 8 + extraSize); + acb.writeShort((short)rteCl.remaining()); acb.writeBytes(rteCl); acb.writeInt(rte.received); acb.writeInt(rte.blockFor); - if (readEx != null) - acb.writeByte((byte)(readEx.dataPresent ? 1 : 0)); + if (isWrite) + { + acb.writeShort((short)writeType.remaining()); + acb.writeBytes(writeType); + } + else + { + acb.writeByte((byte)(((ReadTimeoutException)rte).dataPresent ? 1 : 0)); + } break; case UNPREPARED: PreparedQueryNotFoundException pqnfe = (PreparedQueryNotFoundException)msg.error;