cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gdusba...@apache.org
Subject svn commit: r953373 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
Date Thu, 10 Jun 2010 16:18:21 GMT
Author: gdusbabek
Date: Thu Jun 10 16:18:20 2010
New Revision: 953373

URL: http://svn.apache.org/viewvc?rev=953373&view=rev
Log:
restructure the startup ordering of Gossiper and MessageService to avoid timing anomalies.
patch by Matthew Dennis, reviewed by Gary Dusbabek. CASSANDRA-1160

Added:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Thu Jun 10 16:18:20 2010
@@ -16,6 +16,8 @@
    (CASSANDRA-1146)
  * use generation time to resolve node token reassignment disagreements
    (CASSANDRA-1118)
+ * restructure the startup ordering of Gossiper and MessageService to avoid
+   timing anomalies (CASSANDRA-1160)
 
 
 0.6.2

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
(added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAck2VerbHandler.java
Thu Jun 10 16:18:20 2010
@@ -0,0 +1,39 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Map;
+
+public class GossipDigestAck2VerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        InetAddress from = message.getFrom();
+        if (logger_.isTraceEnabled())
+            logger_.trace("Received a GossipDigestAck2Message from " + from);
+
+        byte[] bytes = message.getMessageBody();
+        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+        GossipDigestAck2Message gDigestAck2Message;
+        try
+        {
+            gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+        /* Notify the Failure Detector */
+        Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
+        Gossiper.instance.applyStateLocally(remoteEpStateMap);
+    }
+}

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
(added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
Thu Jun 10 16:18:20 2010
@@ -0,0 +1,63 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GossipDigestAckVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        InetAddress from = message.getFrom();
+        if (logger_.isTraceEnabled())
+            logger_.trace("Received a GossipDigestAckMessage from " + from);
+
+        byte[] bytes = message.getMessageBody();
+        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+        try
+        {
+            GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+            List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+            Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+
+            if ( epStateMap.size() > 0 )
+            {
+                /* Notify the Failure Detector */
+                Gossiper.instance.notifyFailureDetector(epStateMap);
+                Gossiper.instance.applyStateLocally(epStateMap);
+            }
+
+            /* Get the state required to send to this gossipee - construct GossipDigestAck2Message
*/
+            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress,
EndPointState>();
+            for( GossipDigest gDigest : gDigestList )
+            {
+                InetAddress addr = gDigest.getEndPoint();
+                EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr,
gDigest.getMaxVersion());
+                if ( localEpStatePtr != null )
+                    deltaEpStateMap.put(addr, localEpStatePtr);
+            }
+
+            GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
+            Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
+            if (logger_.isTraceEnabled())
+                logger_.trace("Sending a GossipDigestAck2Message to " + from);
+            MessagingService.instance.sendOneWay(gDigestAck2Message, from);
+        }
+        catch ( IOException e )
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
(added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
Thu Jun 10 16:18:20 2010
@@ -0,0 +1,102 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.*;
+
+public class GossipDigestSynVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        InetAddress from = message.getFrom();
+        if (logger_.isTraceEnabled())
+            logger_.trace("Received a GossipDigestSynMessage from " + from);
+
+        byte[] bytes = message.getMessageBody();
+        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+        try
+        {
+            GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+            /* If the message is from a different cluster throw it away. */
+            if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+            {
+                logger_.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId_
 + "!=" + DatabaseDescriptor.getClusterName());
+                return;
+            }
+
+            List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(gDigestList);
+
+            doSort(gDigestList);
+
+            List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress,
EndPointState>();
+            Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+
+            GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList,
deltaEpStateMap);
+            Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
+            if (logger_.isTraceEnabled())
+                logger_.trace("Sending a GossipDigestAckMessage to " + from);
+            MessagingService.instance.sendOneWay(gDigestAckMessage, from);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*
+     * First construct a map whose key is the endpoint in the GossipDigest and the value
is the
+     * GossipDigest itself. Then build a list of version differences i.e difference between
the
+     * version in the GossipDigest and the version in the local state for a given InetAddress.
+     * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
+     * to the endpoint from the map that was initially constructed.
+    */
+    private void doSort(List<GossipDigest> gDigestList)
+    {
+        /* Construct a map of endpoint to GossipDigest. */
+        Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress,
GossipDigest>();
+        for ( GossipDigest gDigest : gDigestList )
+        {
+            epToDigestMap.put(gDigest.getEndPoint(), gDigest);
+        }
+
+        /*
+         * These digests have their maxVersion set to the difference of the version
+         * of the local EndPointState and the version found in the GossipDigest.
+        */
+        List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
+        for ( GossipDigest gDigest : gDigestList )
+        {
+            InetAddress ep = gDigest.getEndPoint();
+            EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
+            int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion(
epState ) : 0;
+            int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
+            diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+        }
+
+        gDigestList.clear();
+        Collections.sort(diffDigests);
+        int size = diffDigests.size();
+        /*
+         * Report the digests in descending order. This takes care of the endpoints
+         * that are far behind w.r.t this local endpoint
+        */
+        for ( int i = size - 1; i >= 0; --i )
+        {
+            gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+        }
+    }
+}

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jun
10 16:18:20 2010
@@ -25,7 +25,6 @@ import java.net.InetAddress;
 
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -51,6 +50,9 @@ public class Gossiper implements IFailur
         {
             try
             {
+                //wait on messaging service to start listening
+                MessagingService.instance.waitUntilListening();
+
                 synchronized( Gossiper.instance )
                 {
                 	/* Update the local heartbeat counter. */
@@ -483,7 +485,7 @@ public class Gossiper implements IFailur
     }    
 
     /*
-     * This method is called only from the JoinVerbHandler. This happens
+     * This method is called only from the GossiperJoinVerbHandler. This happens
      * when a new node coming up multicasts the JoinMessage. Here we need
      * to add the endPoint to the list of live endpoints.
     */
@@ -873,203 +875,4 @@ public class Gossiper implements IFailur
         gossipTimer_.cancel();
         gossipTimer_ = new Timer(false); // makes the Gossiper reentrant.
     }
-
-    public static class JoinVerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
-
-        public void doVerb(Message message)
-        {
-            InetAddress from = message.getFrom();
-            if (logger_.isDebugEnabled())
-              logger_.debug("Received a JoinMessage from " + from);
-
-            byte[] bytes = message.getMessageBody();
-            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
-            JoinMessage joinMessage;
-            try
-            {
-                joinMessage = JoinMessage.serializer().deserialize(dis);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
-            {
-                Gossiper.instance.join(from);
-            }
-            else
-            {
-                logger_.warn("ClusterName mismatch from " + from + " " + joinMessage.clusterId_
 + "!=" + DatabaseDescriptor.getClusterName());
-            }
-        }
-    }
-
-    public static class GossipDigestSynVerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
-
-        public void doVerb(Message message)
-        {
-            InetAddress from = message.getFrom();
-            if (logger_.isTraceEnabled())
-                logger_.trace("Received a GossipDigestSynMessage from " + from);
-
-            byte[] bytes = message.getMessageBody();
-            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
-            try
-            {
-                GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
-                /* If the message is from a different cluster throw it away. */
-                if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName())
)
-                {
-                    logger_.warn("ClusterName mismatch from " + from + " " + gDigestMessage.clusterId_
 + "!=" + DatabaseDescriptor.getClusterName());
-                    return;
-                }
-
-                List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
-                /* Notify the Failure Detector */
-                Gossiper.instance.notifyFailureDetector(gDigestList);
-
-                doSort(gDigestList);
-
-                List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
-                Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress,
EndPointState>();
-                Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
-
-                GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList,
deltaEpStateMap);
-                Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
-                if (logger_.isTraceEnabled())
-                    logger_.trace("Sending a GossipDigestAckMessage to " + from);
-                MessagingService.instance.sendOneWay(gDigestAckMessage, from);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        /*
-         * First construct a map whose key is the endpoint in the GossipDigest and the value
is the
-         * GossipDigest itself. Then build a list of version differences i.e difference between
the
-         * version in the GossipDigest and the version in the local state for a given InetAddress.
-         * Sort this list. Now loop through the sorted list and retrieve the GossipDigest
corresponding
-         * to the endpoint from the map that was initially constructed.
-        */
-        private void doSort(List<GossipDigest> gDigestList)
-        {
-            /* Construct a map of endpoint to GossipDigest. */
-            Map<InetAddress, GossipDigest> epToDigestMap = new HashMap<InetAddress,
GossipDigest>();
-            for ( GossipDigest gDigest : gDigestList )
-            {
-                epToDigestMap.put(gDigest.getEndPoint(), gDigest);
-            }
-
-            /*
-             * These digests have their maxVersion set to the difference of the version
-             * of the local EndPointState and the version found in the GossipDigest.
-            */
-            List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
-            for ( GossipDigest gDigest : gDigestList )
-            {
-                InetAddress ep = gDigest.getEndPoint();
-                EndPointState epState = Gossiper.instance.getEndPointStateForEndPoint(ep);
-                int version = (epState != null) ? Gossiper.instance.getMaxEndPointStateVersion(
epState ) : 0;
-                int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
-                diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion)
);
-            }
-
-            gDigestList.clear();
-            Collections.sort(diffDigests);
-            int size = diffDigests.size();
-            /*
-             * Report the digests in descending order. This takes care of the endpoints
-             * that are far behind w.r.t this local endpoint
-            */
-            for ( int i = size - 1; i >= 0; --i )
-            {
-                gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
-            }
-        }
-    }
-
-    public static class GossipDigestAckVerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
-
-        public void doVerb(Message message)
-        {
-            InetAddress from = message.getFrom();
-            if (logger_.isTraceEnabled())
-                logger_.trace("Received a GossipDigestAckMessage from " + from);
-
-            byte[] bytes = message.getMessageBody();
-            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
-            try
-            {
-                GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
-                List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
-                Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
-
-                if ( epStateMap.size() > 0 )
-                {
-                    /* Notify the Failure Detector */
-                    Gossiper.instance.notifyFailureDetector(epStateMap);
-                    Gossiper.instance.applyStateLocally(epStateMap);
-                }
-
-                /* Get the state required to send to this gossipee - construct GossipDigestAck2Message
*/
-                Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress,
EndPointState>();
-                for( GossipDigest gDigest : gDigestList )
-                {
-                    InetAddress addr = gDigest.getEndPoint();
-                    EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr,
gDigest.getMaxVersion());
-                    if ( localEpStatePtr != null )
-                        deltaEpStateMap.put(addr, localEpStatePtr);
-                }
-
-                GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
-                Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
-                if (logger_.isTraceEnabled())
-                    logger_.trace("Sending a GossipDigestAck2Message to " + from);
-                MessagingService.instance.sendOneWay(gDigestAck2Message, from);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    public static class GossipDigestAck2VerbHandler implements IVerbHandler
-    {
-        private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
-
-        public void doVerb(Message message)
-        {
-            InetAddress from = message.getFrom();
-            if (logger_.isTraceEnabled())
-                logger_.trace("Received a GossipDigestAck2Message from " + from);
-
-            byte[] bytes = message.getMessageBody();
-            DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-            GossipDigestAck2Message gDigestAck2Message;
-            try
-            {
-                gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-            Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
-            Gossiper.instance.applyStateLocally(remoteEpStateMap);
-        }
-    }
 }
\ No newline at end of file

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java?rev=953373&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java
(added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/gms/GossiperJoinVerbHandler.java
Thu Jun 10 16:18:20 2010
@@ -0,0 +1,44 @@
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.log4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+
+public class GossiperJoinVerbHandler implements IVerbHandler
+{
+    private static Logger logger_ = Logger.getLogger( GossiperJoinVerbHandler.class);
+
+    public void doVerb(Message message)
+    {
+        InetAddress from = message.getFrom();
+        if (logger_.isDebugEnabled())
+          logger_.debug("Received a JoinMessage from " + from);
+
+        byte[] bytes = message.getMessageBody();
+        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+        JoinMessage joinMessage;
+        try
+        {
+            joinMessage = JoinMessage.serializer().deserialize(dis);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+        if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+        {
+            Gossiper.instance.join(from);
+        }
+        else
+        {
+            logger_.warn("ClusterName mismatch from " + from + " " + joinMessage.clusterId_
 + "!=" + DatabaseDescriptor.getClusterName());
+        }
+    }
+}

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/MessagingService.java
Thu Jun 10 16:18:20 2010
@@ -18,29 +18,34 @@
 
 package org.apache.cassandra.net;
 
-import org.apache.cassandra.concurrent.*;
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.IFailureDetectionEventListener;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
+import org.apache.cassandra.utils.ExpiringMap;
+import org.apache.cassandra.utils.GuidGenerator;
+import org.apache.cassandra.utils.SimpleCondition;
 import org.apache.log4j.Logger;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import java.io.IOError;
 import java.io.IOException;
-import java.net.ServerSocket;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ServerSocketChannel;
 import java.security.MessageDigest;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -72,8 +77,9 @@ public class MessagingService implements
     private static Logger logger_ = Logger.getLogger(MessagingService.class);
     
     public static final MessagingService instance = new MessagingService();
-    
+
     private SocketThread socketThread;
+    private SimpleCondition listenGate;
 
     public Object clone() throws CloneNotSupportedException
     {
@@ -82,7 +88,8 @@ public class MessagingService implements
     }
 
     protected MessagingService()
-    {        
+    {
+        listenGate = new SimpleCondition();
         verbHandlers_ = new HashMap<StorageService.Verb, IVerbHandler>();
         /*
          * Leave callbacks in the cachetable long enough that any related messages will arrive
@@ -103,7 +110,7 @@ public class MessagingService implements
 
         streamExecutor_ = new JMXEnabledThreadPoolExecutor("MESSAGE-STREAMING-POOL");
     }
-    
+
     public byte[] hash(String type, byte data[])
     {
         byte result[];
@@ -138,6 +145,19 @@ public class MessagingService implements
         ss.bind(new InetSocketAddress(localEp, DatabaseDescriptor.getStoragePort()));
         socketThread = new SocketThread(ss, "ACCEPT-" + localEp);
         socketThread.start();
+        listenGate.signalAll();
+    }
+
+    public void waitUntilListening()
+    {
+        try
+        {
+            listenGate.await();
+        }
+        catch (InterruptedException ie)
+        {
+            logger_.debug("await interrupted");
+        }
     }
 
     public static OutboundTcpConnectionPool getConnectionPool(InetAddress to)

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=953373&r1=953372&r2=953373&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Thu Jun 10 16:18:20 2010
@@ -222,10 +222,10 @@ public class StorageService implements I
         MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler());
         MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler());
 
-        MessagingService.instance.registerVerbHandlers(Verb.JOIN, new Gossiper.JoinVerbHandler());
-        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new Gossiper.GossipDigestSynVerbHandler());
-        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new Gossiper.GossipDigestAckVerbHandler());
-        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new Gossiper.GossipDigestAck2VerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.JOIN, new GossiperJoinVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
+        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
 
         replicationStrategies = new HashMap<String, AbstractReplicationStrategy>();
         for (String table : DatabaseDescriptor.getNonSystemTables())
@@ -287,10 +287,10 @@ public class StorageService implements I
         initialized = true;
         isClientMode = true;
         logger_.info("Starting up client gossip");
-        MessagingService.instance.listen(FBUtilities.getLocalAddress());
+        setMode("Client", false);
         Gossiper.instance.register(this);
         Gossiper.instance.start(FBUtilities.getLocalAddress(), (int)(System.currentTimeMillis()
/ 1000)); // needed for node-ring gathering.
-        setMode("Client", false);
+        MessagingService.instance.listen(FBUtilities.getLocalAddress());
     }
 
     public synchronized void initServer() throws IOException
@@ -326,16 +326,16 @@ public class StorageService implements I
 
         logger_.info("Starting up server gossip");
 
-        MessagingService.instance.listen(FBUtilities.getLocalAddress());
-
-        StorageLoadBalancer.instance.startBroadcasting();
-
         // have to start the gossip service before we can see any info on other nodes.  this
is necessary
         // for bootstrap to get the load info it needs.
         // (we won't be part of the storage ring though until we add a nodeId to our state,
below.)
         Gossiper.instance.register(this);
         Gossiper.instance.start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration());
// needed for node-ring gathering.
 
+        MessagingService.instance.listen(FBUtilities.getLocalAddress());
+
+        StorageLoadBalancer.instance.startBroadcasting();
+
         if (DatabaseDescriptor.isAutoBootstrap()
                 && DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress())
                 && !SystemTable.isBootstrapped())



Mime
View raw message