Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A2F401048D for ; Wed, 19 Jun 2013 03:41:35 +0000 (UTC) Received: (qmail 93449 invoked by uid 500); 19 Jun 2013 03:41:35 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 93297 invoked by uid 500); 19 Jun 2013 03:41:34 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 93275 invoked by uid 99); 19 Jun 2013 03:41:34 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Jun 2013 03:41:34 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id B646224070; Wed, 19 Jun 2013 03:41:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Wed, 19 Jun 2013 03:41:34 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/3] git commit: Fix cross-DC mutation forwarding patch by jbellis; reviewed by dbrosius for CASSANDRA-5632 Fix cross-DC mutation forwarding patch by jbellis; reviewed by dbrosius for CASSANDRA-5632 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b1d7405f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b1d7405f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b1d7405f Branch: refs/heads/trunk Commit: b1d7405fd1263a04d8cc4bbfcba3ec1928b75738 Parents: 26c4262 Author: Jonathan Ellis Authored: Tue Jun 18 22:20:42 2013 -0500 Committer: Jonathan Ellis Committed: Tue Jun 18 22:20:49 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/RowMutationVerbHandler.java | 11 ++-- src/java/org/apache/cassandra/db/Table.java | 4 +- .../org/apache/cassandra/net/MessageOut.java | 11 ---- .../apache/cassandra/service/StorageProxy.java | 57 ++++++++------------ 5 files changed, 28 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c48eb7d..5d36bd9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.6 + * Fix cross-DC mutation forwarding (CASSANDRA-5632) * Reduce SSTableLoader memory usage (CASSANDRA-5555) * Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272) * (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java index c2126f5..eedd134 100644 --- a/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java +++ b/src/java/org/apache/cassandra/db/RowMutationVerbHandler.java @@ -37,13 +37,13 @@ public class RowMutationVerbHandler implements IVerbHandler try { RowMutation rm = message.payload; - logger.debug("Applying mutation"); // Check if there were any forwarding headers in this message - InetAddress replyTo = message.from; byte[] from = message.parameters.get(RowMutation.FORWARD_FROM); + InetAddress replyTo; if (from == null) { + replyTo = message.from; byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO); if (forwardBytes != null && message.version >= MessagingService.VERSION_11) forwardToLocalNodes(rm, message.verb, forwardBytes, message.from); @@ -73,15 +73,14 @@ public class RowMutationVerbHandler implements IVerbHandler DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(forwardBytes)); int size = dis.readInt(); - // remove fwds from message to avoid infinite loop + // tell the recipients who to send their ack to MessageOut message = new MessageOut(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress()); + // Send a message to each of the addresses on our Forward List for (int i = 0; i < size; i++) { - // Send a message to each of the addresses on our Forward List InetAddress address = CompactEndpointSerializationHelper.deserialize(dis); String id = dis.readUTF(); - logger.debug("Forwarding message to {}@{}", id, address); - // Let the response go back to the coordinator + Tracing.trace("Enqueuing forwarded write to {}", address); MessagingService.instance().sendOneWay(message, id, address); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index 17c510b..99a3446 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -363,10 +363,8 @@ public class Table */ public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes) { - if (!mutation.getTable().equals(Tracing.TRACE_KS)) - Tracing.trace("Acquiring switchLock read lock"); - // write the mutation to the commitlog and memtables + Tracing.trace("Acquiring switchLock read lock"); switchLock.readLock().lock(); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/net/MessageOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java index 14ef377..da9eda4 100644 --- a/src/java/org/apache/cassandra/net/MessageOut.java +++ b/src/java/org/apache/cassandra/net/MessageOut.java @@ -82,17 +82,6 @@ public class MessageOut builder.putAll(parameters).put(key, value); return new MessageOut(verb, payload, serializer, builder.build()); } - - public MessageOut withHeaderRemoved(String key) - { - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Map.Entry entry : parameters.entrySet()) - { - if (!entry.getKey().equals(key)) - builder.put(entry.getKey(), entry.getValue()); - } - return new MessageOut(verb, payload, serializer, builder.build()); - } public Stage getStage() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b1d7405f/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index c12cace..ee045eb 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -204,14 +204,7 @@ public class StorageProxy implements StorageProxyMBean { writeMetrics.timeouts.mark(); ClientRequestMetrics.writeTimeouts.inc(); - if (logger.isDebugEnabled()) - { - List mstrings = new ArrayList(mutations.size()); - for (IMutation mutation : mutations) - mstrings.add(mutation.toString(true)); - logger.debug("Write timeout {} for one (or more) of: {}", ex.toString(), mstrings); - } - Tracing.trace("Write timeout"); + Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); throw ex; } catch (UnavailableException e) @@ -480,8 +473,8 @@ public class StorageProxy implements StorageProxyMBean ConsistencyLevel consistency_level) throws OverloadedException { - // Multimap that holds onto all the messages and addresses meant for a specific datacenter - Map> dcMessages = new HashMap>(); + // replicas grouped by datacenter + Map> dcGroups = null; for (InetAddress destination : targets) { @@ -506,13 +499,15 @@ public class StorageProxy implements StorageProxyMBean { // belongs on a different server String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(destination); - Multimap messages = dcMessages.get(dc); - if (messages == null) + Collection dcTargets = (dcGroups != null) ? dcGroups.get(dc) : null; + if (dcTargets == null) { - messages = HashMultimap.create(); - dcMessages.put(dc, messages); + dcTargets = new ArrayList(3); // most DCs will have <= 3 replicas + if (dcGroups == null) + dcGroups = new HashMap>(); + dcGroups.put(dc, dcTargets); } - messages.put(rm.createMessage(), destination); + dcTargets.add(destination); } } else @@ -525,7 +520,17 @@ public class StorageProxy implements StorageProxyMBean } } - sendMessages(localDataCenter, dcMessages, responseHandler); + if (dcGroups != null) + { + MessageOut message = rm.createMessage(); + // for each datacenter, send the message to one node to relay the write to other replicas + for (Map.Entry> entry: dcGroups.entrySet()) + { + boolean isLocalDC = entry.getKey().equals(localDataCenter); + Collection dcTargets = entry.getValue(); + sendMessagesToOneDC(message, dcTargets, isLocalDC, responseHandler); + } + } } public static Future submitHint(final RowMutation mutation, @@ -580,26 +585,6 @@ public class StorageProxy implements StorageProxyMBean totalHints.incrementAndGet(); } - /** - * for each datacenter, send a message to one node to relay the write to other replicas - */ - private static void sendMessages(String localDataCenter, Map> dcMessages, AbstractWriteResponseHandler handler) - { - for (Map.Entry> entry: dcMessages.entrySet()) - { - boolean isLocalDC = entry.getKey().equals(localDataCenter); - for (Map.Entry> messages: entry.getValue().asMap().entrySet()) - { - MessageOut message = messages.getKey(); - Collection targets = messages.getValue(); - // a single message object is used for unhinted writes, so clean out any forwards - // from previous loop iterations - message = message.withHeaderRemoved(RowMutation.FORWARD_TO); - sendMessagesToOneDC(message, targets, isLocalDC, handler); - } - } - } - private static void sendMessagesToOneDC(MessageOut message, Collection targets, boolean localDC, AbstractWriteResponseHandler handler) { try