Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 26434 invoked from network); 11 Jan 2011 23:36:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Jan 2011 23:36:32 -0000 Received: (qmail 47573 invoked by uid 500); 11 Jan 2011 23:36:32 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 47478 invoked by uid 500); 11 Jan 2011 23:36:32 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 47468 invoked by uid 99); 11 Jan 2011 23:36:32 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jan 2011 23:36:32 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jan 2011 23:36:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 956A223888DD; Tue, 11 Jan 2011 23:36:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1057929 - in /cassandra/branches/cassandra-0.7: CHANGES.txt src/java/org/apache/cassandra/net/MessagingService.java Date: Tue, 11 Jan 2011 23:36:05 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110111233605.956A223888DD@eris.apache.org> Author: jbellis Date: Tue Jan 11 23:36:05 2011 New Revision: 1057929 URL: http://svn.apache.org/viewvc?rev=1057929&view=rev Log: fix race condition in MessagingService.targets patch by jbellis; reviewed by Folke Behrens for CASSANDRA-1959 Modified: cassandra/branches/cassandra-0.7/CHANGES.txt cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Modified: cassandra/branches/cassandra-0.7/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1057929&r1=1057928&r2=1057929&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.7/CHANGES.txt Tue Jan 11 23:36:05 2011 @@ -18,6 +18,7 @@ * fix CFMetaData.apply to only compare objects of the same class (CASSANDRA-1962) * allow specifying specific SSTables to compact from JMX (CASSANDRA-1963) + * fix race condition in MessagingService.targets (CASSANDRA-1959) 0.7.0-dev Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java?rev=1057929&r1=1057928&r2=1057929&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/net/MessagingService.java Tue Jan 11 23:36:05 2011 @@ -30,6 +30,7 @@ import java.nio.channels.AsynchronousClo import java.nio.channels.ServerSocketChannel; import java.security.MessageDigest; import java.util.*; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -58,6 +59,7 @@ import org.apache.cassandra.utils.Expiri import org.apache.cassandra.utils.GuidGenerator; import org.apache.cassandra.utils.SimpleCondition; import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.cliffc.high_scale_lib.NonBlockingHashSet; public class MessagingService implements MessagingServiceMBean, ILatencyPublisher { @@ -70,7 +72,7 @@ public class MessagingService implements /* This records all the results mapped by message Id */ private static ExpiringMap callbacks; - private static Multimap targets; + private static ConcurrentMap> targets = new NonBlockingHashMap>(); /* Lookup table for registering message handlers based on the verb. */ private static Map verbHandlers_; @@ -127,7 +129,7 @@ public class MessagingService implements { public Object apply(String messageId) { - Collection addresses = targets.removeAll(messageId); + Collection addresses = targets.remove(messageId); if (addresses == null) return null; @@ -140,7 +142,6 @@ public class MessagingService implements return null; } }; - targets = ArrayListMultimap.create(); callbacks = new ExpiringMap((long) (1.1 * DatabaseDescriptor.getRpcTimeout()), timeoutReporter); MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); @@ -255,12 +256,33 @@ public class MessagingService implements addCallback(cb, messageId); for (InetAddress endpoint : to) { - targets.put(messageId, endpoint); + putTarget(messageId, endpoint); sendOneWay(message, endpoint); } return messageId; } + private void putTarget(String messageId, InetAddress endpoint) + { + Collection addresses = targets.get(messageId); + if (addresses == null) + { + addresses = new NonBlockingHashSet(); + Collection oldAddresses = targets.putIfAbsent(messageId, addresses); + if (oldAddresses != null) + addresses = oldAddresses; + } + addresses.add(endpoint); + } + + private static void removeTarget(String messageId, InetAddress from) + { + Collection addresses = targets.get(messageId); + // null is expected if we removed the callback or we got a reply after its timeout expired + if (addresses != null) + addresses.remove(from); + } + public void addCallback(IAsyncCallback cb, String messageId) { callbacks.put(messageId, cb); @@ -280,7 +302,7 @@ public class MessagingService implements { String messageId = message.getMessageId(); addCallback(cb, messageId); - targets.put(messageId, to); + putTarget(messageId, to); sendOneWay(message, to); return messageId; } @@ -307,7 +329,7 @@ public class MessagingService implements for ( int i = 0; i < messages.length; ++i ) { messages[i].setMessageId(groupId); - targets.put(groupId, to.get(i)); + putTarget(groupId, to.get(i)); sendOneWay(messages[i], to.get(i)); } return groupId; @@ -361,7 +383,7 @@ public class MessagingService implements { IAsyncResult iar = new AsyncResult(); callbacks.put(message.getMessageId(), iar); - targets.put(message.getMessageId(), to); + putTarget(message.getMessageId(), to); sendOneWay(message, to); return iar; } @@ -429,7 +451,7 @@ public class MessagingService implements public static IMessageCallback removeRegisteredCallback(String messageId) { - targets.removeAll(messageId); // TODO fix this when we clean up quorum reads to do proper RR + targets.remove(messageId); // TODO fix this when we clean up quorum reads to do proper RR return callbacks.remove(messageId); } @@ -440,7 +462,7 @@ public class MessagingService implements public static void responseReceivedFrom(String messageId, InetAddress from) { - targets.remove(messageId, from); + removeTarget(messageId, from); } public static void validateMagic(int magic) throws IOException