cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject cassandra git commit: Always check for collisions before joining ring
Date Wed, 27 Apr 2016 08:39:06 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 0e5266664 -> 2bc5f0c61


Always check for collisions before joining ring

Patch by Sam Tunnicliffe; reviewed by Joel Knighton for CASSANDRA-10134

The collision check and shadow round can be skipped completely (for testing
etc) by setting cassandra.allow_unsafe_join=true.

This commit also enables explicit unsafe replace without bootstrap by using
both auto_bootstrap=false and cassandra.replace_address. Doing so requires
cassandra.allow_unsafe_replace=true.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2bc5f0c6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2bc5f0c6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2bc5f0c6

Branch: refs/heads/trunk
Commit: 2bc5f0c61ddb428b4826d83d42dad473eaeac002
Parents: 0e52666
Author: Sam Tunnicliffe <sam@beobal.com>
Authored: Wed Mar 16 09:53:04 2016 +0000
Committer: Sam Tunnicliffe <sam@beobal.com>
Committed: Wed Apr 27 09:25:33 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   4 +
 .../apache/cassandra/db/view/ViewBuilder.java   |   1 +
 .../apache/cassandra/db/view/ViewManager.java   |  16 ++
 .../gms/GossipDigestAckVerbHandler.java         |   5 +-
 .../gms/GossipDigestSynVerbHandler.java         |  28 ++-
 src/java/org/apache/cassandra/gms/Gossiper.java |  92 ++++++++--
 .../org/apache/cassandra/io/util/FileUtils.java |   4 +-
 .../locator/DynamicEndpointSnitch.java          |   2 +-
 .../cassandra/service/StorageService.java       | 172 ++++++++++++-------
 .../cassandra/service/StorageServiceMBean.java  |   2 +-
 .../apache/cassandra/tools/nodetool/Info.java   |   2 +-
 .../cassandra/utils/JVMStabilityInspector.java  |   4 +-
 .../unit/org/apache/cassandra/SchemaLoader.java |   2 +
 14 files changed, 244 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2bbd39d..f37a8ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.6
+ * Always perform collision check before joining ring (CASSANDRA-10134)
  * SSTableWriter output discrepancy (CASSANDRA-11646)
  * Fix potential timeout in NativeTransportService.testConcurrentDestroys (CASSANDRA-10756)
  * Support large partitions on the 3.0 sstable format (CASSANDRA-11206)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index a177d37..7f24d2c 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -18,6 +18,10 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+   - Collision checks are performed when joining the token ring, regardless of whether
+     the node should bootstrap. Additionally, replace_address can legitimately be used
+     without bootstrapping to help with recovery of nodes with partially failed disks.
+     See CASSANDRA-10134 for more details.
    - Key cache will only hold indexed entries up to the size configured by
      column_index_cache_size_in_kb in cassandra.yaml in memory. Larger indexed entries
      will never go into memory. See CASSANDRA-11206 for more details.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 23eeba4..8944122 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -103,6 +103,7 @@ public class ViewBuilder extends CompactionInfo.Holder
 
     public void run()
     {
+        logger.trace("Running view builder for {}.{}", baseCfs.metadata.ksName, view.name);
         UUID localHostId = SystemKeyspace.getLocalHostId();
         String ksname = baseCfs.metadata.ksName, viewName = view.name;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/db/view/ViewManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewManager.java b/src/java/org/apache/cassandra/db/view/ViewManager.java
index faa5551..37428ad 100644
--- a/src/java/org/apache/cassandra/db/view/ViewManager.java
+++ b/src/java/org/apache/cassandra/db/view/ViewManager.java
@@ -35,6 +35,8 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.repair.SystemDistributedKeyspace;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -212,6 +214,20 @@ public class ViewManager
                 addView(entry.getValue());
         }
 
+        // Building views involves updating view build status in the system_distributed
+        // keyspace and therefore it requires ring information. This check prevents builds
+        // being submitted when Keyspaces are initialized during CassandraDaemon::setup as
+        // that happens before StorageService & gossip are initialized. After SS has
been
+        // init'd we schedule builds for *all* views anyway, so this doesn't have any effect
+        // on startup. It does mean however, that builds will not be triggered if gossip
is
+        // disabled via JMX or nodetool as that sets SS to an uninitialized state.
+        if (!StorageService.instance.isInitialized())
+        {
+            logger.info("Not submitting build tasks for views in keyspace {} as " +
+                        "storage service is not initialized", keyspace.getName());
+            return;
+        }
+
         for (View view : allViews())
         {
             view.build();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..15662b1 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -61,8 +61,9 @@ public class GossipDigestAckVerbHandler implements IVerbHandler<GossipDigestAck>
         if (Gossiper.instance.isInShadowRound())
         {
             if (logger.isDebugEnabled())
-                logger.debug("Finishing shadow round with {}", from);
-            Gossiper.instance.finishShadowRound();
+                logger.debug("Received an ack from {}, which may trigger exit from shadow
round", from);
+            // if the ack is completely empty, then we can infer that the respondent is also
in a shadow round
+            Gossiper.instance.maybeFinishShadowRound(from, gDigestList.isEmpty() &&
epStateMap.isEmpty());
             return; // don't bother doing anything else, we have what we came for
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
index 1c67570..6d0afa2 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java
@@ -38,7 +38,7 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn>
         InetAddress from = message.from;
         if (logger.isTraceEnabled())
             logger.trace("Received a GossipDigestSynMessage from {}", from);
-        if (!Gossiper.instance.isEnabled())
+        if (!Gossiper.instance.isEnabled() && !Gossiper.instance.isInShadowRound())
         {
             if (logger.isTraceEnabled())
                 logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled");
@@ -60,6 +60,32 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn>
         }
 
         List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+
+        // if the syn comes from a peer performing a shadow round and this node is
+        // also currently in a shadow round, send back a minimal ack. This node must
+        // be in the sender's seed list and doing this allows the sender to
+        // differentiate between seeds from which it is partitioned and those which
+        // are in their shadow round
+        if (!Gossiper.instance.isEnabled() && Gossiper.instance.isInShadowRound())
+        {
+            // a genuine syn (as opposed to one from a node currently
+            // doing a shadow round) will always contain > 0 digests
+            if (gDigestList.size() > 0)
+            {
+                logger.debug("Ignoring non-empty GossipDigestSynMessage because currently
in gossip shadow round");
+                return;
+            }
+
+            logger.debug("Received a shadow round syn from {}. Gossip is disabled but " +
+                         "currently also in shadow round, responding with a minimal ack",
from);
+            MessagingService.instance()
+                            .sendOneWay(new MessageOut<>(MessagingService.Verb.GOSSIP_DIGEST_ACK,
+                                                         new GossipDigestAck(new ArrayList<>(),
new HashMap<>()),
+                                                         GossipDigestAck.serializer),
+                                        from);
+            return;
+        }
+
         if (logger.isTraceEnabled())
         {
             StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 6f63727..76e7577 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -123,6 +123,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private final Map<InetAddress, Long> expireTimeEndpointMap = new ConcurrentHashMap<InetAddress,
Long>();
 
     private volatile boolean inShadowRound = false;
+    private final Set<InetAddress> seedsInShadowRound = new ConcurrentSkipListSet<>(inetcomparator);
 
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
@@ -708,27 +709,46 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
-     * Check if this endpoint can safely bootstrap into the cluster.
+     * Check if this node can safely be started and join the ring.
+     * If the node is bootstrapping, examines gossip state for any previous status to decide
whether
+     * it's safe to allow this node to start & bootstrap. If not bootstrapping, compares
the host ID
+     * that the node itself has (obtained by reading from system.local or generated if not
present)
+     * with the host ID obtained from gossip for the endpoint address (if any). This latter
case
+     * prevents a non-bootstrapping, new node from being started with the same address of
a
+     * previously started, but currently down predecessor.
      *
      * @param endpoint - the endpoint to check
-     * @return true if the endpoint can join the cluster
+     * @param localHostUUID - the host id to check
+     * @param isBootstrapping - whether the node intends to bootstrap when joining
+     * @return true if it is safe to start the node, false otherwise
      */
-    public boolean isSafeForBootstrap(InetAddress endpoint)
+    public boolean isSafeForStartup(InetAddress endpoint, UUID localHostUUID, boolean isBootstrapping)
     {
         EndpointState epState = endpointStateMap.get(endpoint);
-
         // if there's no previous state, or the node was previously removed from the cluster,
we're good
         if (epState == null || isDeadState(epState))
             return true;
 
-        String status = getGossipStatus(epState);
-
-        // these states are not allowed to join the cluster as it would not be safe
-        final List<String> unsafeStatuses = new ArrayList<String>() {{
-            add(""); // failed bootstrap but we did start gossiping
-            add(VersionedValue.STATUS_NORMAL); // node is legit in the cluster or it was
stopped with kill -9
-            add(VersionedValue.SHUTDOWN); }}; // node was shutdown
-        return !unsafeStatuses.contains(status);
+        if (isBootstrapping)
+        {
+            String status = getGossipStatus(epState);
+            // these states are not allowed to join the cluster as it would not be safe
+            final List<String> unsafeStatuses = new ArrayList<String>()
+            {{
+                add("");                           // failed bootstrap but we did start gossiping
+                add(VersionedValue.STATUS_NORMAL); // node is legit in the cluster or it
was stopped with kill -9
+                add(VersionedValue.SHUTDOWN);      // node was shutdown
+            }};
+            return !unsafeStatuses.contains(status);
+        }
+        else
+        {
+            // if the previous UUID matches what we currently have (i.e. what was read from
+            // system.local at startup), then we're good to start up. Otherwise, something
+            // is amiss and we need to replace the previous node
+            VersionedValue previous = epState.getApplicationState(ApplicationState.HOST_ID);
+            return UUID.fromString(previous.value).equals(localHostUUID);
+        }
     }
 
     private void doStatusCheck()
@@ -1318,11 +1338,19 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     /**
      *  Do a single 'shadow' round of gossip, where we do not modify any state
-     *  Only used when replacing a node, to get and assume its states
+     *  Used when preparing to join the ring:
+     *      * when replacing a node, to get and assume its tokens
+     *      * when joining, to check that the local host id matches any previous id for the
endpoint address
      */
     public void doShadowRound()
     {
         buildSeedsList();
+        // it may be that the local address is the only entry in the seed
+        // list in which case, attempting a shadow round is pointless
+        if (seeds.isEmpty())
+            return;
+
+        seedsInShadowRound.clear();
         // send a completely empty syn
         List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
         GossipDigestSyn digestSynMessage = new GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1341,6 +1369,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
                 if (slept % 5000 == 0)
                 { // CASSANDRA-8072, retry at the beginning and every 5 seconds
                     logger.trace("Sending shadow round GOSSIP DIGEST SYN to seeds {}", seeds);
+
                     for (InetAddress seed : seeds)
                         MessagingService.instance().sendOneWay(message, seed);
                 }
@@ -1351,7 +1380,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
                 slept += 1000;
                 if (slept > StorageService.RING_DELAY)
-                    throw new RuntimeException("Unable to gossip with any seeds");
+                {
+                    // if we don't consider ourself to be a seed, fail out
+                    if (!DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress()))
+                        throw new RuntimeException("Unable to gossip with any seeds");
+
+                    logger.warn("Unable to gossip with any seeds but continuing since node
is in its own seed list");
+                    inShadowRound = false;
+                    break;
+                }
             }
         }
         catch (InterruptedException wtf)
@@ -1478,10 +1515,33 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && (!scheduledGossipTask.isCancelled());
     }
 
-    protected void finishShadowRound()
+    protected void maybeFinishShadowRound(InetAddress respondent, boolean isInShadowRound)
     {
         if (inShadowRound)
-            inShadowRound = false;
+        {
+            if (!isInShadowRound)
+            {
+                logger.debug("Received a regular ack from {}, can now exit shadow round",
respondent);
+                // respondent sent back a full ack, so we can exit our shadow round
+                inShadowRound = false;
+                seedsInShadowRound.clear();
+            }
+            else
+            {
+                // respondent indicates it too is in a shadow round, if all seeds
+                // are in this state then we can exit our shadow round. Otherwise,
+                // we keep retrying the SR until one responds with a full ACK or
+                // we learn that all seeds are in SR.
+                logger.debug("Received an ack from {} indicating it is also in shadow round",
respondent);
+                seedsInShadowRound.add(respondent);
+                if (seedsInShadowRound.containsAll(seeds))
+                {
+                    logger.debug("All seeds are in a shadow round, clearing this node to
exit its own");
+                    inShadowRound = false;
+                    seedsInShadowRound.clear();
+                }
+            }
+        }
     }
 
     protected boolean isInShadowRound()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index a076bbd..6b58e85 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -463,7 +463,7 @@ public class FileUtils
 
     public static void handleCorruptSSTable(CorruptSSTableException e)
     {
-        if (!StorageService.instance.isSetupCompleted())
+        if (!StorageService.instance.isDaemonSetupCompleted())
             handleStartupFSError(e);
 
         JVMStabilityInspector.inspectThrowable(e);
@@ -477,7 +477,7 @@ public class FileUtils
     
     public static void handleFSError(FSError e)
     {
-        if (!StorageService.instance.isSetupCompleted())
+        if (!StorageService.instance.isDaemonSetupCompleted())
             handleStartupFSError(e);
 
         JVMStabilityInspector.inspectThrowable(e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 3e89dd4..fb0db13 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -253,7 +253,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILa
 
     private void updateScores() // this is expensive
     {
-        if (!StorageService.instance.isInitialized()) 
+        if (!StorageService.instance.isGossipActive())
             return;
         if (!registered)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 56d9b4a..d4ad59a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -251,8 +251,9 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     /* true if node is rebuilding and receiving data */
     private final AtomicBoolean isRebuilding = new AtomicBoolean();
 
-    private boolean initialized;
+    private volatile boolean initialized = false;
     private volatile boolean joined = false;
+    private volatile boolean gossipActive = false;
 
     /* the probability for tracing any particular request, 0 disables tracing and 1 enables
for all */
     private double traceProbability = 0.0;
@@ -369,24 +370,24 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     // should only be called via JMX
     public void stopGossiping()
     {
-        if (initialized)
+        if (gossipActive)
         {
             logger.warn("Stopping gossip by operator request");
             Gossiper.instance.stop();
-            initialized = false;
+            gossipActive = false;
         }
     }
 
     // should only be called via JMX
     public void startGossiping()
     {
-        if (!initialized)
+        if (!gossipActive)
         {
             logger.warn("Starting gossip by operator request");
             setGossipTokens(getLocalTokens());
             Gossiper.instance.forceNewerGeneration();
             Gossiper.instance.start((int) (System.currentTimeMillis() / 1000));
-            initialized = true;
+            gossipActive = true;
         }
     }
 
@@ -462,7 +463,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
 
     public void stopTransports()
     {
-        if (isInitialized())
+        if (isGossipActive())
         {
             logger.error("Stopping gossiper");
             stopGossiping();
@@ -500,7 +501,12 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         return initialized;
     }
 
-    public boolean isSetupCompleted()
+    public boolean isGossipActive()
+    {
+        return gossipActive;
+    }
+
+    public boolean isDaemonSetupCompleted()
     {
         return daemon == null
                ? false
@@ -514,50 +520,56 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         daemon.deactivate();
     }
 
-    public synchronized Collection<Token> prepareReplacementInfo() throws ConfigurationException
+    private synchronized UUID prepareReplacementInfo(InetAddress replaceAddress) throws ConfigurationException
     {
         logger.info("Gathering node replacement information for {}", DatabaseDescriptor.getReplaceAddress());
-        if (!MessagingService.instance().isListening())
-            MessagingService.instance().listen();
-
-        // make magic happen
         Gossiper.instance.doShadowRound();
+        // as we've completed the shadow round of gossip, we should be able to find the node
we're replacing
+        if (Gossiper.instance.getEndpointStateForEndpoint(replaceAddress) == null)
+            throw new RuntimeException(String.format("Cannot replace_address %s because it
doesn't exist in gossip", replaceAddress));
 
-        UUID hostId = null;
-        // now that we've gossiped at least once, we should be able to find the node we're
replacing
-        if (Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())==
null)
-            throw new RuntimeException("Cannot replace_address " + DatabaseDescriptor.getReplaceAddress()
+ " because it doesn't exist in gossip");
-        hostId = Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
         try
         {
-            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            VersionedValue tokensVersionedValue = Gossiper.instance.getEndpointStateForEndpoint(replaceAddress).getApplicationState(ApplicationState.TOKENS);
             if (tokensVersionedValue == null)
-                throw new RuntimeException("Could not find tokens for " + DatabaseDescriptor.getReplaceAddress()
+ " to replace");
-            Collection<Token> tokens = TokenSerializer.deserialize(tokenMetadata.partitioner,
new DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
+                throw new RuntimeException(String.format("Could not find tokens for %s to
replace", replaceAddress));
 
-            SystemKeyspace.setLocalHostId(hostId); // use the replacee's host Id as our own
so we receive hints, etc
-            Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we
need
-            return tokens;
+            bootstrapTokens = TokenSerializer.deserialize(tokenMetadata.partitioner, new
DataInputStream(new ByteArrayInputStream(tokensVersionedValue.toBytes())));
         }
         catch (IOException e)
         {
             throw new RuntimeException(e);
         }
+
+        // we'll use the replacee's host Id as our own so we receive hints, etc
+        UUID localHostId = Gossiper.instance.getHostId(replaceAddress);
+        SystemKeyspace.setLocalHostId(localHostId);
+        Gossiper.instance.resetEndpointStateMap(); // clean up since we have what we need
+        return localHostId;
     }
 
-    public synchronized void checkForEndpointCollision() throws ConfigurationException
+    private synchronized void checkForEndpointCollision(UUID localHostId) throws ConfigurationException
     {
+        if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
+        {
+            logger.warn("Skipping endpoint collision check as cassandra.allow_unsafe_join=true");
+            return;
+        }
+
         logger.debug("Starting shadow gossip round to check for endpoint collision");
-        if (!MessagingService.instance().isListening())
-            MessagingService.instance().listen();
         Gossiper.instance.doShadowRound();
-        if (!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+        // If bootstrapping, check whether any previously known status for the endpoint makes
it unsafe to do so.
+        // If not bootstrapping, compare the host id for this endpoint learned from gossip
(if any) with the local
+        // one, which was either read from system.local or generated at startup. If a learned
id is present &
+        // doesn't match the local, then the node needs replacing
+        if (!Gossiper.instance.isSafeForStartup(FBUtilities.getBroadcastAddress(), localHostId,
shouldBootstrap()))
         {
             throw new RuntimeException(String.format("A node with address %s already exists,
cancelling join. " +
                                                      "Use cassandra.replace_address if you
want to replace this node.",
                                                      FBUtilities.getBroadcastAddress()));
         }
-        if (useStrictConsistency && !allowSimultaneousMoves())
+
+        if (shouldBootstrap() && useStrictConsistency && !allowSimultaneousMoves())
         {
             for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates())
             {
@@ -571,6 +583,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
                     throw new UnsupportedOperationException("Other bootstrapping/leaving/moving
nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true");
             }
         }
+        logger.debug("Resetting gossip state after shadow round");
         Gossiper.instance.resetEndpointStateMap();
     }
 
@@ -583,6 +596,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
     public void unsafeInitialize() throws ConfigurationException
     {
         initialized = true;
+        gossipActive = true;
         Gossiper.instance.register(this);
         Gossiper.instance.start((int) (System.currentTimeMillis() / 1000)); // needed for
node-ring gathering.
         Gossiper.instance.addLocalApplicationState(ApplicationState.NET_VERSION, valueFactory.networkVersion());
@@ -617,8 +631,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         logger.info("CQL supported versions: {} (default: {})",
                 StringUtils.join(ClientState.getCQLSupportedVersion(), ","), ClientState.DEFAULT_CQL_VERSION);
 
-        initialized = true;
-
         try
         {
             // Ensure StorageProxy is initialized on start-up; see CASSANDRA-3797.
@@ -631,27 +643,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
             throw new AssertionError(e);
         }
 
-        if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
-        {
-            logger.info("Loading persisted ring state");
-            Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
-            Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
-            for (InetAddress ep : loadedTokens.keySet())
-            {
-                if (ep.equals(FBUtilities.getBroadcastAddress()))
-                {
-                    // entry has been mistakenly added, delete it
-                    SystemKeyspace.removeEndpoint(ep);
-                }
-                else
-                {
-                    if (loadedHostIds.containsKey(ep))
-                        tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
-                    Gossiper.instance.addSavedEndpoint(ep);
-                }
-            }
-        }
-
         // daemon threads, like our executors', continue to run while shutdown hooks are
invoked
         drainOnShutdown = new Thread(new WrappedRunnable()
         {
@@ -724,6 +715,9 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         if (!Boolean.parseBoolean(System.getProperty("cassandra.start_gossip", "true")))
         {
             logger.info("Not starting gossip as requested.");
+            // load ring state in preparation for starting gossip later
+            loadRingState();
+            initialized = true;
             return;
         }
 
@@ -758,6 +752,32 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
             }
             logger.info("Not joining ring as requested. Use JMX (StorageService->joinRing())
to initiate ring joining");
         }
+
+        initialized = true;
+    }
+
+    private void loadRingState()
+    {
+        if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
+        {
+            logger.info("Loading persisted ring state");
+            Multimap<InetAddress, Token> loadedTokens = SystemKeyspace.loadTokens();
+            Map<InetAddress, UUID> loadedHostIds = SystemKeyspace.loadHostIds();
+            for (InetAddress ep : loadedTokens.keySet())
+            {
+                if (ep.equals(FBUtilities.getBroadcastAddress()))
+                {
+                    // entry has been mistakenly added, delete it
+                    SystemKeyspace.removeEndpoint(ep);
+                }
+                else
+                {
+                    if (loadedHostIds.containsKey(ep))
+                        tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
+                    Gossiper.instance.addSavedEndpoint(ep);
+                }
+            }
+        }
     }
 
     /**
@@ -793,47 +813,71 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
                 else
                     throw new ConfigurationException("This node was decommissioned and will
not rejoin the ring unless cassandra.override_decommission=true has been set, or all existing
data is removed and the node is bootstrapped again");
             }
-            if (replacing && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring",
"true"))))
-                throw new ConfigurationException("Cannot set both join_ring=false and attempt
to replace a node");
+
             if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode()
!= null)
                 throw new RuntimeException("Replace method removed; use cassandra.replace_address
instead");
+
+            if (!MessagingService.instance().isListening())
+                MessagingService.instance().listen();
+
+            UUID localHostId = SystemKeyspace.getLocalHostId();
+
             if (replacing)
             {
                 if (SystemKeyspace.bootstrapComplete())
                     throw new RuntimeException("Cannot replace address with a node that is
already bootstrapped");
-                if (!DatabaseDescriptor.isAutoBootstrap())
-                    throw new RuntimeException("Trying to replace_address with auto_bootstrap
disabled will not work, check your configuration");
-                bootstrapTokens = prepareReplacementInfo();
+
+                if (!(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true"))))
+                    throw new ConfigurationException("Cannot set both join_ring=false and
attempt to replace a node");
+
+                if (!DatabaseDescriptor.isAutoBootstrap() && !Boolean.getBoolean("cassandra.allow_unsafe_replace"))
+                    throw new RuntimeException("Replacing a node without bootstrapping risks
invalidating consistency " +
+                                               "guarantees as the expected data may not be
present until repair is run. " +
+                                               "To perform this operation, please restart
with " +
+                                               "-Dcassandra.allow_unsafe_replace=true");
+
+                InetAddress replaceAddress = DatabaseDescriptor.getReplaceAddress();
+                localHostId = prepareReplacementInfo(replaceAddress);
                 appStates.put(ApplicationState.TOKENS, valueFactory.tokens(bootstrapTokens));
-                appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+
+                // if want to bootstrap the ranges of the node we're replacing,
+                // go into hibernate mode while that happens. Otherwise, persist
+                // the tokens we're taking over locally so that they don't get
+                // clobbered with auto generated ones in joinTokenRing
+                if (DatabaseDescriptor.isAutoBootstrap())
+                    appStates.put(ApplicationState.STATUS, valueFactory.hibernate(true));
+                else
+                    SystemKeyspace.updateTokens(bootstrapTokens);
             }
-            else if (shouldBootstrap())
+            else
             {
-                checkForEndpointCollision();
+                checkForEndpointCollision(localHostId);
             }
 
             // have to start the gossip service before we can see any info on other nodes.
 this is necessary
             // for bootstrap to get the load info it needs.
             // (we won't be part of the storage ring though until we add a counterId to our
state, below.)
             // Seed the host ID-to-endpoint map with our own ID.
-            UUID localHostId = SystemKeyspace.getLocalHostId();
             getTokenMetadata().updateHostId(localHostId, FBUtilities.getBroadcastAddress());
             appStates.put(ApplicationState.NET_VERSION, valueFactory.networkVersion());
             appStates.put(ApplicationState.HOST_ID, valueFactory.hostId(localHostId));
             appStates.put(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getBroadcastRpcAddress()));
             appStates.put(ApplicationState.RELEASE_VERSION, valueFactory.releaseVersion());
+
+            // load the persisted ring state. This used to be done earlier in the init process,
+            // but now we always perform a shadow round when preparing to join and we have
to
+            // clear endpoint states after doing that.
+            loadRingState();
+
             logger.info("Starting up server gossip");
             Gossiper.instance.register(this);
             Gossiper.instance.start(SystemKeyspace.incrementAndGetGeneration(), appStates);
// needed for node-ring gathering.
+            gossipActive = true;
             // gossip snitch infos (local DC and rack)
             gossipSnitchInfo();
             // gossip Schema.emptyVersion forcing immediate check for schema updates (see
MigrationManager#maybeScheduleSchemaPull)
             Schema.instance.updateVersionAndAnnounce(); // Ensure we know our own actual
Schema UUID in preparation for updates
-
-            if (!MessagingService.instance().isListening())
-                MessagingService.instance().listen();
             LoadBroadcaster.instance.startBroadcasting();
-
             HintsService.instance.startDispatch();
             BatchlogManager.instance.start();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8978472..277fbe9 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -478,7 +478,7 @@ public interface StorageServiceMBean extends NotificationEmitter
     // allows a user to forcibly completely stop cassandra
     public void stopDaemon();
 
-    // to determine if gossip is disabled
+    // to determine if initialization has completed
     public boolean isInitialized();
 
     // allows a user to disable thrift

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/tools/nodetool/Info.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java b/src/java/org/apache/cassandra/tools/nodetool/Info.java
index 0d9bd73..268d9df 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Info.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java
@@ -43,7 +43,7 @@ public class Info extends NodeToolCmd
     @Override
     public void execute(NodeProbe probe)
     {
-        boolean gossipInitialized = probe.isInitialized();
+        boolean gossipInitialized = probe.isGossipRunning();
 
         System.out.printf("%-23s: %s%n", "ID", probe.getLocalHostId());
         System.out.printf("%-23s: %s%n", "Gossip active", gossipInitialized);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
index ab3471c..bda7997 100644
--- a/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
+++ b/src/java/org/apache/cassandra/utils/JVMStabilityInspector.java
@@ -31,9 +31,7 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.Cassandra;
 
 /**
  * Responsible for deciding whether to kill the JVM if it gets in an "unstable" state (think
OOM).
@@ -76,7 +74,7 @@ public final class JVMStabilityInspector
 
     public static void inspectCommitLogThrowable(Throwable t)
     {
-        if (!StorageService.instance.isSetupCompleted())
+        if (!StorageService.instance.isDaemonSetupCompleted())
         {
             logger.error("Exiting due to error while processing commit log during initialization.",
t);
             killer.killCurrentJVM(t, true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2bc5f0c6/test/unit/org/apache/cassandra/SchemaLoader.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java
index 992c4d6..6aea343 100644
--- a/test/unit/org/apache/cassandra/SchemaLoader.java
+++ b/test/unit/org/apache/cassandra/SchemaLoader.java
@@ -71,6 +71,8 @@ public class SchemaLoader
 
     public static void startGossiper()
     {
+        // skip shadow round and endpoint collision check in tests
+        System.setProperty("cassandra.allow_unsafe_join", "true");
         if (!Gossiper.instance.isEnabled())
             Gossiper.instance.start((int) (System.currentTimeMillis() / 1000));
     }


Mime
View raw message