cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From brandonwilli...@apache.org
Subject [2/6] cassandra git commit: Add shutdown gossip state to prevent timeouts during rolling restarts
Date Wed, 15 Apr 2015 14:34:57 GMT
Add shutdown gossip state to prevent timeouts during rolling restarts

Patch by brandonwilliams, reviewed by Richard Low for CASSANDRA-8336


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2c62bb2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2c62bb2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2c62bb2

Branch: refs/heads/cassandra-2.1
Commit: b2c62bb20be52a698a5683ef8ffcdefe560dbc9a
Parents: 53848f7
Author: Brandon Williams <brandonwilliams@apache.org>
Authored: Wed Apr 15 09:30:12 2015 -0500
Committer: Brandon Williams <brandonwilliams@apache.org>
Committed: Wed Apr 15 09:30:12 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipShutdownVerbHandler.java          |  2 +-
 src/java/org/apache/cassandra/gms/Gossiper.java | 86 ++++++++++++++++++--
 .../apache/cassandra/gms/HeartBeatState.java    |  5 ++
 .../apache/cassandra/gms/VersionedValue.java    |  6 ++
 .../cassandra/service/StorageService.java       | 17 ++--
 6 files changed, 102 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 521668d..460b07c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.15:
+ * Add shutdown gossip state to prevent timeouts during rolling restarts (CASSANDRA-8336)
  * Fix running with java.net.preferIPv6Addresses=true (CASSANDRA-9137)
  * Fix failed bootstrap/replace attempts being persisted in system.peers (CASSANDRA-9180)
  * Flush system.IndexInfo after marking index built (CASSANDRA-9128)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
index ef71208..1691107 100644
--- a/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipShutdownVerbHandler.java
@@ -34,7 +34,7 @@ public class GossipShutdownVerbHandler implements IVerbHandler
             logger.debug("Ignoring shutdown message from {} because gossip is disabled",
message.from);
             return;
         }
-        FailureDetector.instance.forceConviction(message.from);
+        Gossiper.instance.markAsShutdown(message.from);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 962a358..090033e 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -71,6 +71,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     static final ApplicationState[] STATES = ApplicationState.values();
     static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN,
VersionedValue.REMOVED_TOKEN,
                                                           VersionedValue.STATUS_LEFT, VersionedValue.HIBERNATE);
+    static List<String> SILENT_SHUTDOWN_STATES = DEAD_STATES;
+    static {
+        SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_BOOTSTRAPPING);
+        SILENT_SHUTDOWN_STATES.add(VersionedValue.STATUS_LEFT);
+    }
 
     private ScheduledFuture<?> scheduledGossipTask;
     private static final ReentrantLock taskLock = new ReentrantLock();
@@ -297,6 +302,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             return 0L;
     }
 
+    private boolean isShutdown(InetAddress endpoint)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        if (epState == null)
+            return false;
+        if (epState.getApplicationState(ApplicationState.STATUS) == null)
+            return false;
+        String value = epState.getApplicationState(ApplicationState.STATUS).value;
+        String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+        assert (pieces.length > 0);
+        String state = pieces[0];
+        return state.equals(VersionedValue.SHUTDOWN);
+    }
+
     /**
      * This method is part of IFailureDetectionEventListener interface. This is invoked
      * by the Failure Detector when it convicts an end point.
@@ -308,7 +327,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         EndpointState epState = endpointStateMap.get(endpoint);
         if (epState == null)
             return;
-        if (epState.isAlive() && !isDeadState(epState))
+        if (isShutdown(endpoint) && epState.isAlive())
+        {
+            markAsShutdown(endpoint);
+        }
+        else if (epState.isAlive() && !isDeadState(epState))
         {
             markDead(endpoint, epState);
         }
@@ -317,6 +340,21 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
+     * This method is used to mark a node as shutdown; that is it gracefully exited on its
own and told us about it
+     * @param endpoint endpoint that has shut itself down
+     */
+    protected void markAsShutdown(InetAddress endpoint)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        if (epState == null)
+            return;
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
+        epState.getHeartBeatState().forceHighestPossibleVersionUnsafe();
+        markDead(endpoint, epState);
+        FailureDetector.instance.forceConviction(endpoint);
+    }
+
+    /**
      * Return either: the greatest heartbeat or application state
      *
      * @param epState
@@ -963,6 +1001,9 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         }
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
             subscriber.onJoin(ep, epState);
+        // check this at the end so nodes will learn about the endpoint
+        if (isShutdown(ep))
+            markAsShutdown(ep);
     }
 
     public boolean isDeadState(EndpointState epState)
@@ -981,6 +1022,22 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return false;
     }
 
+    public boolean isSilentShutdownState(EndpointState epState)
+    {
+        if (epState.getApplicationState(ApplicationState.STATUS) == null)
+            return false;
+        String value = epState.getApplicationState(ApplicationState.STATUS).value;
+        String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+        assert (pieces.length > 0);
+        String state = pieces[0];
+        for (String deadstate : SILENT_SHUTDOWN_STATES)
+        {
+            if (state.equals(deadstate))
+                return true;
+        }
+        return false;
+    }
+
     void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
     {
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
@@ -1264,6 +1321,12 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         endpointStateMap.putIfAbsent(FBUtilities.getBroadcastAddress(), localState);
     }
 
+    public void forceNewerGeneration()
+    {
+        EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        epstate.getHeartBeatState().forceNewerGenerationUnsafe();
+    }
+
 
     /**
      * Add an endpoint we knew about previously, but whose state is unknown
@@ -1330,13 +1393,20 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     public void stop()
     {
-    	if (scheduledGossipTask != null)
-    		scheduledGossipTask.cancel(false);
-        logger.info("Announcing shutdown");
-        Uninterruptibles.sleepUninterruptibly(intervalInMillis * 2, TimeUnit.MILLISECONDS);
-        MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN);
-        for (InetAddress ep : liveEndpoints)
-            MessagingService.instance().sendOneWay(message, ep);
+        EndpointState mystate = endpointStateMap.get(FBUtilities.getBroadcastAddress());
+        if (mystate != null && !isSilentShutdownState(mystate))
+        {
+            logger.info("Announcing shutdown");
+            addLocalApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true));
+            MessageOut message = new MessageOut(MessagingService.Verb.GOSSIP_SHUTDOWN);
+            for (InetAddress ep : liveEndpoints)
+                MessagingService.instance().sendOneWay(message, ep);
+            Uninterruptibles.sleepUninterruptibly(Integer.getInteger("cassandra.shutdown_announce_in_ms",
2000), TimeUnit.MILLISECONDS);
+        }
+        else
+            logger.warn("No local state or state is in silent shutdown, not announcing shutdown");
+        if (scheduledGossipTask != null)
+            scheduledGossipTask.cancel(false);
     }
 
     public boolean isEnabled()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/HeartBeatState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/HeartBeatState.java b/src/java/org/apache/cassandra/gms/HeartBeatState.java
index c3b423c..4af5dd8 100644
--- a/src/java/org/apache/cassandra/gms/HeartBeatState.java
+++ b/src/java/org/apache/cassandra/gms/HeartBeatState.java
@@ -63,6 +63,11 @@ class HeartBeatState
         generation += 1;
     }
 
+    void forceHighestPossibleVersionUnsafe()
+    {
+        version = Integer.MAX_VALUE;
+    }
+
     public String toString()
     {
         return String.format("HeartBeat: generation = %d, version = %d", generation, version);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 565a8cb..b0918ac 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -69,6 +69,7 @@ public class VersionedValue implements Comparable<VersionedValue>
     public final static String REMOVED_TOKEN = "removed";
 
     public final static String HIBERNATE = "hibernate";
+    public final static String SHUTDOWN = "shutdown";
 
     // values for ApplicationState.REMOVAL_COORDINATOR
     public final static String REMOVAL_COORDINATOR = "REMOVER";
@@ -207,6 +208,11 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER
+ value);
         }
 
+        public VersionedValue shutdown(boolean value)
+        {
+            return new VersionedValue(VersionedValue.SHUTDOWN + VersionedValue.DELIMITER
+ value);
+        }
+
         public VersionedValue datacenter(String dcId)
         {
             return new VersionedValue(dcId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2c62bb2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e906f03..077413f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -37,12 +37,10 @@ import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
-import com.google.common.util.concurrent.AtomicDouble;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
@@ -205,11 +203,16 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         SystemKeyspace.updateTokens(tokens);
         tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         Collection<Token> localTokens = getLocalTokens();
+        setGossipTokens(localTokens);
+        setMode(Mode.NORMAL, false);
+    }
+
+    public void setGossipTokens(Collection<Token> tokens)
+    {
         List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState,
VersionedValue>>();
-        states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(localTokens)));
-        states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(localTokens)));
+        states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens)));
+        states.add(Pair.create(ApplicationState.STATUS, valueFactory.normal(tokens)));
         Gossiper.instance.addLocalApplicationStates(states);
-        setMode(Mode.NORMAL, false);
     }
 
     public StorageService()
@@ -289,6 +292,8 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         if (!initialized)
         {
             logger.warn("Starting gossip by operator request");
+            setGossipTokens(getLocalTokens());
+            Gossiper.instance.forceNewerGeneration();
             Gossiper.instance.start((int) (System.currentTimeMillis() / 1000));
             initialized = true;
         }
@@ -1346,7 +1351,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
 
             if (moveName.equals(VersionedValue.STATUS_BOOTSTRAPPING))
                 handleStateBootstrap(endpoint, pieces);
-            else if (moveName.equals(VersionedValue.STATUS_NORMAL))
+            else if (moveName.equals(VersionedValue.STATUS_NORMAL) || moveName.equals(VersionedValue.SHUTDOWN))
                 handleStateNormal(endpoint, pieces);
             else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
                 handleStateRemoving(endpoint, pieces);


Mime
View raw message