Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 14075 invoked from network); 10 Jun 2010 16:19:10 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 10 Jun 2010 16:19:10 -0000 Received: (qmail 42523 invoked by uid 500); 10 Jun 2010 16:19:10 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 42509 invoked by uid 500); 10 Jun 2010 16:19:10 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 42501 invoked by uid 99); 10 Jun 2010 16:19:10 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jun 2010 16:19:10 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 Jun 2010 16:19:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8207B2388993; Thu, 10 Jun 2010 16:18:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r953373 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ Date: Thu, 10 Jun 2010 16:18:21 -0000 To: commits@cassandra.apache.org From: gdusbabek@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100610161821.8207B2388993@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gdusbabek Date: Thu Jun 10 16:18:20 2010 New Revision: 953373 URL: http://svn.apache.org/viewvc?rev=953373&view=rev Log: restructure the startup ordering of Gossiper and MessageService to avoid timing anomalies. patch by Matthew Dennis, reviewed by Gary Dusbabek. CASSANDRA-1160 Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=953373&r1=953372&r2=953373&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Jun 10 16:18:20 2010 @@ -16,6 +16,8 @@ (CASSANDRA-1146) * use generation time to resolve node token reassignment disagreements (CASSANDRA-1118) + * restructure the startup ordering of Gossiper and MessageService to avoid + timing anomalies (CASSANDRA-1160) 0.6.2 Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=953373&view=auto ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java (added) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java Thu Jun 10 16:18:20 2010 @@ -0,0 +1,39 @@ +package org.apache.cassandra.gms; + +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Map; + +public class GossipDigestAck2VerbHandler implements IVerbHandler +{ + private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class); + + public void doVerb(Message message) + { + InetAddress from = message.getFrom(); + if (logger_.isTraceEnabled()) + logger_.trace("Received a GossipDigestAck2Message from " + from); + + byte[] bytes = message.getMessageBody(); + DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); + GossipDigestAck2Message gDigestAck2Message; + try + { + gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + Map remoteEpStateMap = gDigestAck2Message.getEndPointStateMap(); + /* Notify the Failure Detector */ + Gossiper.instance.notifyFailureDetector(remoteEpStateMap); + Gossiper.instance.applyStateLocally(remoteEpStateMap); + } +} Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=953373&view=auto ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java (added) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java Thu Jun 10 16:18:20 2010 @@ -0,0 +1,63 @@ +package org.apache.cassandra.gms; + +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class GossipDigestAckVerbHandler implements IVerbHandler +{ + private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class); + + public void doVerb(Message message) + { + InetAddress from = message.getFrom(); + if (logger_.isTraceEnabled()) + logger_.trace("Received a GossipDigestAckMessage from " + from); + + byte[] bytes = message.getMessageBody(); + DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); + + try + { + GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis); + List gDigestList = gDigestAckMessage.getGossipDigestList(); + Map epStateMap = gDigestAckMessage.getEndPointStateMap(); + + if ( epStateMap.size() > 0 ) + { + /* Notify the Failure Detector */ + Gossiper.instance.notifyFailureDetector(epStateMap); + Gossiper.instance.applyStateLocally(epStateMap); + } + + /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ + Map deltaEpStateMap = new HashMap(); + for( GossipDigest gDigest : gDigestList ) + { + InetAddress addr = gDigest.getEndPoint(); + EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); + if ( localEpStatePtr != null ) + deltaEpStateMap.put(addr, localEpStatePtr); + } + + GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap); + Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2); + if (logger_.isTraceEnabled()) + logger_.trace("Sending a GossipDigestAck2Message to " + from); + MessagingService.instance.sendOneWay(gDigestAck2Message, from); + } + catch ( IOException e ) + { + throw new RuntimeException(e); + } + } +} Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=953373&view=auto ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java (added) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java Thu Jun 10 16:18:20 2010 @@ -0,0 +1,102 @@ +package org.apache.cassandra.gms; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetAddress; +import java.util.*; + +public class GossipDigestSynVerbHandler implements IVerbHandler +{ + private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class); + + public void doVerb(Message message) + { + InetAddress from = message.getFrom(); + if (logger_.isTraceEnabled()) + logger_.trace("Received a GossipDigestSynMessage from " + from); + + byte[] bytes = message.getMessageBody(); + DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); + + try + { + GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis); + /* If the message is from a different cluster throw it away. */ + if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) ) + { + logger_.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName()); + return; + } + + List gDigestList = gDigestMessage.getGossipDigests(); + /* Notify the Failure Detector */ + Gossiper.instance.notifyFailureDetector(gDigestList); + + doSort(gDigestList); + + List deltaGossipDigestList = new ArrayList(); + Map deltaEpStateMap = new HashMap(); + Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); + + GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap); + Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck); + if (logger_.isTraceEnabled()) + logger_.trace("Sending a GossipDigestAckMessage to " + from); + MessagingService.instance.sendOneWay(gDigestAckMessage, from); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /* + * First construct a map whose key is the endpoint in the GossipDigest and the value is the + * GossipDigest itself. Then build a list of version differences i.e difference between the + * version in the GossipDigest and the version in the local state for a given InetAddress. + * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding + * to the endpoint from the map that was initially constructed. + */ + private void doSort(List gDigestList) + { + /* Construct a map of endpoint to GossipDigest. */ + Map epToDigestMap = new HashMap(); + for ( GossipDigest gDigest : gDigestList ) + { + epToDigestMap.put(gDigest.getEndPoint(), gDigest); + } + + /* + * These digests have their maxVersion set to the difference of the version + * of the local EndPointState and the version found in the GossipDigest. + */ + List diffDigests = new ArrayList(); + for ( GossipDigest gDigest : gDigestList ) + { + InetAddress ep = gDigest.getEndPoint(); + EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep); + int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0; + int diffVersion = Math.abs(version - gDigest.getMaxVersion() ); + diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) ); + } + + gDigestList.clear(); + Collections.sort(diffDigests); + int size = diffDigests.size(); + /* + * Report the digests in descending order. This takes care of the endpoints + * that are far behind w.r.t this local endpoint + */ + for ( int i = size - 1; i >= 0; --i ) + { + gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) ); + } + } +} Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java?rev=953373&r1=953372&r2=953373&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jun 10 16:18:20 2010 @@ -25,7 +25,6 @@ import java.net.InetAddress; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; @@ -51,6 +50,9 @@ public class Gossiper implements IFailur { try { + //wait on messaging service to start listening + MessagingService.instance.waitUntilListening(); + synchronized( Gossiper.instance ) { /* Update the local heartbeat counter. */ @@ -483,7 +485,7 @@ public class Gossiper implements IFailur } /* - * This method is called only from the JoinVerbHandler. This happens + * This method is called only from the GossiperJoinVerbHandler. This happens * when a new node coming up multicasts the JoinMessage. Here we need * to add the endPoint to the list of live endpoints. */ @@ -873,203 +875,4 @@ public class Gossiper implements IFailur gossipTimer_.cancel(); gossipTimer_ = new Timer(false); // makes the Gossiper reentrant. } - - public static class JoinVerbHandler implements IVerbHandler - { - private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class); - - public void doVerb(Message message) - { - InetAddress from = message.getFrom(); - if (logger_.isDebugEnabled()) - logger_.debug("Received a JoinMessage from " + from); - - byte[] bytes = message.getMessageBody(); - DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); - - JoinMessage joinMessage; - try - { - joinMessage = JoinMessage.serializer().deserialize(dis); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) ) - { - Gossiper.instance.join(from); - } - else - { - logger_.warn("ClusterName mismatch from " + from + " " + joinMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName()); - } - } - } - - public static class GossipDigestSynVerbHandler implements IVerbHandler - { - private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class); - - public void doVerb(Message message) - { - InetAddress from = message.getFrom(); - if (logger_.isTraceEnabled()) - logger_.trace("Received a GossipDigestSynMessage from " + from); - - byte[] bytes = message.getMessageBody(); - DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); - - try - { - GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis); - /* If the message is from a different cluster throw it away. */ - if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) ) - { - logger_.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName()); - return; - } - - List gDigestList = gDigestMessage.getGossipDigests(); - /* Notify the Failure Detector */ - Gossiper.instance.notifyFailureDetector(gDigestList); - - doSort(gDigestList); - - List deltaGossipDigestList = new ArrayList(); - Map deltaEpStateMap = new HashMap(); - Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); - - GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap); - Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck); - if (logger_.isTraceEnabled()) - logger_.trace("Sending a GossipDigestAckMessage to " + from); - MessagingService.instance.sendOneWay(gDigestAckMessage, from); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - /* - * First construct a map whose key is the endpoint in the GossipDigest and the value is the - * GossipDigest itself. Then build a list of version differences i.e difference between the - * version in the GossipDigest and the version in the local state for a given InetAddress. - * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding - * to the endpoint from the map that was initially constructed. - */ - private void doSort(List gDigestList) - { - /* Construct a map of endpoint to GossipDigest. */ - Map epToDigestMap = new HashMap(); - for ( GossipDigest gDigest : gDigestList ) - { - epToDigestMap.put(gDigest.getEndPoint(), gDigest); - } - - /* - * These digests have their maxVersion set to the difference of the version - * of the local EndPointState and the version found in the GossipDigest. - */ - List diffDigests = new ArrayList(); - for ( GossipDigest gDigest : gDigestList ) - { - InetAddress ep = gDigest.getEndPoint(); - EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep); - int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion( epState ) : 0; - int diffVersion = Math.abs(version - gDigest.getMaxVersion() ); - diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) ); - } - - gDigestList.clear(); - Collections.sort(diffDigests); - int size = diffDigests.size(); - /* - * Report the digests in descending order. This takes care of the endpoints - * that are far behind w.r.t this local endpoint - */ - for ( int i = size - 1; i >= 0; --i ) - { - gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) ); - } - } - } - - public static class GossipDigestAckVerbHandler implements IVerbHandler - { - private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class); - - public void doVerb(Message message) - { - InetAddress from = message.getFrom(); - if (logger_.isTraceEnabled()) - logger_.trace("Received a GossipDigestAckMessage from " + from); - - byte[] bytes = message.getMessageBody(); - DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); - - try - { - GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis); - List gDigestList = gDigestAckMessage.getGossipDigestList(); - Map epStateMap = gDigestAckMessage.getEndPointStateMap(); - - if ( epStateMap.size() > 0 ) - { - /* Notify the Failure Detector */ - Gossiper.instance.notifyFailureDetector(epStateMap); - Gossiper.instance.applyStateLocally(epStateMap); - } - - /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */ - Map deltaEpStateMap = new HashMap(); - for( GossipDigest gDigest : gDigestList ) - { - InetAddress addr = gDigest.getEndPoint(); - EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); - if ( localEpStatePtr != null ) - deltaEpStateMap.put(addr, localEpStatePtr); - } - - GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap); - Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2); - if (logger_.isTraceEnabled()) - logger_.trace("Sending a GossipDigestAck2Message to " + from); - MessagingService.instance.sendOneWay(gDigestAck2Message, from); - } - catch ( IOException e ) - { - throw new RuntimeException(e); - } - } - } - - public static class GossipDigestAck2VerbHandler implements IVerbHandler - { - private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class); - - public void doVerb(Message message) - { - InetAddress from = message.getFrom(); - if (logger_.isTraceEnabled()) - logger_.trace("Received a GossipDigestAck2Message from " + from); - - byte[] bytes = message.getMessageBody(); - DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); - GossipDigestAck2Message gDigestAck2Message; - try - { - gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - Map remoteEpStateMap = gDigestAck2Message.getEndPointStateMap(); - /* Notify the Failure Detector */ - Gossiper.instance.notifyFailureDetector(remoteEpStateMap); - Gossiper.instance.applyStateLocally(remoteEpStateMap); - } - } } \ No newline at end of file Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java?rev=953373&view=auto ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java (added) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java Thu Jun 10 16:18:20 2010 @@ -0,0 +1,44 @@ +package org.apache.cassandra.gms; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.IVerbHandler; +import org.apache.cassandra.net.Message; +import org.apache.log4j.Logger; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.InetAddress; + +public class GossiperJoinVerbHandler implements IVerbHandler +{ + private static Logger logger_ = Logger.getLogger( GossiperJoinVerbHandler.class); + + public void doVerb(Message message) + { + InetAddress from = message.getFrom(); + if (logger_.isDebugEnabled()) + logger_.debug("Received a JoinMessage from " + from); + + byte[] bytes = message.getMessageBody(); + DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); + + JoinMessage joinMessage; + try + { + joinMessage = JoinMessage.serializer().deserialize(dis); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) ) + { + Gossiper.instance.join(from); + } + else + { + logger_.warn("ClusterName mismatch from " + from + " " + joinMessage.clusterId_ + "!=" + DatabaseDescriptor.getClusterName()); + } + } +} Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=953373&r1=953372&r2=953373&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java Thu Jun 10 16:18:20 2010 @@ -18,29 +18,34 @@ package org.apache.cassandra.net; -import org.apache.cassandra.concurrent.*; +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.io.SerializerType; import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.*; -import org.cliffc.high_scale_lib.NonBlockingHashMap; - +import org.apache.cassandra.utils.ExpiringMap; +import org.apache.cassandra.utils.GuidGenerator; +import org.apache.cassandra.utils.SimpleCondition; import org.apache.log4j.Logger; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import java.io.IOError; import java.io.IOException; -import java.net.ServerSocket; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.ServerSocketChannel; import java.security.MessageDigest; -import java.util.*; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -72,8 +77,9 @@ public class MessagingService implements private static Logger logger_ = Logger.getLogger(MessagingService.class); public static final MessagingService instance = new MessagingService(); - + private SocketThread socketThread; + private SimpleCondition listenGate; public Object clone() throws CloneNotSupportedException { @@ -82,7 +88,8 @@ public class MessagingService implements } protected MessagingService() - { + { + listenGate = new SimpleCondition(); verbHandlers_ = new HashMap(); /* * Leave callbacks in the cachetable long enough that any related messages will arrive @@ -103,7 +110,7 @@ public class MessagingService implements streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL"); } - + public byte[] hash(String type, byte data[]) { byte result[]; @@ -138,6 +145,19 @@ public class MessagingService implements ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort())); socketThread = new SocketThread(ss, "ACCEPT-" + localEp); socketThread.start(); + listenGate.signalAll(); + } + + public void waitUntilListening() + { + try + { + listenGate.await(); + } + catch (InterruptedException ie) + { + logger_.debug("await interrupted"); + } } public static OutboundTcpConnectionPool getConnectionPool(InetAddress to) Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=953373&r1=953372&r2=953373&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java Thu Jun 10 16:18:20 2010 @@ -222,10 +222,10 @@ public class StorageService implements I MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler()); - MessagingService.instance.registerVerbHandlers(Verb.JOIN, new Gossiper.JoinVerbHandler()); - MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new Gossiper.GossipDigestSynVerbHandler()); - MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new Gossiper.GossipDigestAckVerbHandler()); - MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new Gossiper.GossipDigestAck2VerbHandler()); + MessagingService.instance.registerVerbHandlers(Verb.JOIN, new GossiperJoinVerbHandler()); + MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); + MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); + MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler()); replicationStrategies = new HashMap(); for (String table : DatabaseDescriptor.getNonSystemTables()) @@ -287,10 +287,10 @@ public class StorageService implements I initialized = true; isClientMode = true; logger_.info("Starting up client gossip"); - MessagingService.instance.listen(FBUtilities.getLocalAddress()); + setMode("Client", false); Gossiper.instance.register(this); Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering. - setMode("Client", false); + MessagingService.instance.listen(FBUtilities.getLocalAddress()); } public synchronized void initServer() throws IOException @@ -326,16 +326,16 @@ public class StorageService implements I logger_.info("Starting up server gossip"); - MessagingService.instance.listen(FBUtilities.getLocalAddress()); - - StorageLoadBalancer.instance.startBroadcasting(); - // have to start the gossip service before we can see any info on other nodes. this is necessary // for bootstrap to get the load info it needs. // (we won't be part of the storage ring though until we add a nodeId to our state, below.) Gossiper.instance.register(this); Gossiper.instance.start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration()); // needed for node-ring gathering. + MessagingService.instance.listen(FBUtilities.getLocalAddress()); + + StorageLoadBalancer.instance.startBroadcasting(); + if (DatabaseDescriptor.isAutoBootstrap() && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) && !SystemTable.isBootstrapped())