cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject cassandra git commit: Wait for migration responses to complete before bootstrapping
Date Tue, 24 Nov 2015 23:23:20 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 29b988d8c -> ae315b5ec


Wait for migration responses to complete before bootstrapping

patch by Mike Adamson; reviewed by Sergio Bossa for CASSANDRA-10731


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ae315b5e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ae315b5e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ae315b5e

Branch: refs/heads/cassandra-3.0
Commit: ae315b5ec944571342146867c51b2ceb50f3845e
Parents: 29b988d
Author: Mike Adamson <madamson@datastax.com>
Authored: Mon Nov 16 15:48:33 2015 +0000
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Tue Nov 24 23:18:14 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/service/MigrationManager.java     | 24 ++++++++++++++++--
 .../apache/cassandra/service/MigrationTask.java | 26 ++++++++++++++++++++
 .../cassandra/service/StorageService.java       |  9 +++----
 4 files changed, 53 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 608d8f8..116d4c3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.1
+ * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731)
  * Unable to create a function with argument of type Inet (CASSANDRA-10741)
  * Fix backward incompatibiliy in CqlInputFormat (CASSANDRA-10717)
  * Correctly preserve deletion info on updated rows when notifying indexers

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index b7f9bf3..c0b5b10 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -59,8 +59,10 @@ public class MigrationManager
 
     public static final int MIGRATION_DELAY_IN_MS = 60000;
 
+    private static final int MIGRATION_TASK_WAIT_IN_SECONDS = Integer.parseInt(System.getProperty("cassandra.migration_task_wait_in_seconds",
"1"));
+
     private final List<MigrationListener> listeners = new CopyOnWriteArrayList<>();
-    
+
     private MigrationManager() {}
 
     public void register(MigrationListener listener)
@@ -148,7 +150,25 @@ public class MigrationManager
 
     public static boolean isReadyForBootstrap()
     {
-        return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount()
== 0;
+        return MigrationTask.getInflightTasks().isEmpty();
+    }
+
+    public static void waitUntilReadyForBootstrap()
+    {
+        CountDownLatch completionLatch;
+        while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null)
+        {
+            try
+            {
+                if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS))
+                    logger.error("Migration task failed to complete");
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                logger.error("Migration task was interrupted");
+            }
+        }
     }
 
     public void notifyCreateKeyspace(KeyspaceMetadata ksm)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java
index 8a1b858..39a5a11 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -20,11 +20,17 @@ package org.apache.cassandra.service;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -39,6 +45,10 @@ class MigrationTask extends WrappedRunnable
 {
     private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);
 
+    private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new
ConcurrentLinkedQueue<>();
+
+    private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP,
BootstrapState.IN_PROGRESS);
+
     private final InetAddress endpoint;
 
     MigrationTask(InetAddress endpoint)
@@ -46,6 +56,11 @@ class MigrationTask extends WrappedRunnable
         this.endpoint = endpoint;
     }
 
+    public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
+    {
+        return inflightTasks;
+    }
+
     public void runMayThrow() throws Exception
     {
         // There is a chance that quite some time could have passed between now and the MM#maybeScheduleSchemaPull(),
@@ -65,6 +80,8 @@ class MigrationTask extends WrappedRunnable
 
         MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST,
null, MigrationManager.MigrationsSerializer.instance);
 
+        final CountDownLatch completionLatch = new CountDownLatch(1);
+
         IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
         {
             @Override
@@ -78,6 +95,10 @@ class MigrationTask extends WrappedRunnable
                 {
                     logger.error("Configuration exception merging remote schema", e);
                 }
+                finally
+                {
+                    completionLatch.countDown();
+                }
             }
 
             public boolean isLatencyForSnitch()
@@ -85,6 +106,11 @@ class MigrationTask extends WrappedRunnable
                 return false;
             }
         };
+
+        // Only save the latches if we need bootstrap or are bootstrapping
+        if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
+            inflightTasks.offer(completionLatch);
+
         MessagingService.instance().sendRR(message, endpoint, cb);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ae315b5e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 1c20a22..1baa478 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.service;
 
-import static java.nio.charset.StandardCharsets.ISO_8859_1;
-
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.File;
@@ -848,12 +846,13 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
                 }
                 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
             }
-            // if our schema hasn't matched yet, keep sleeping until it does
+            // if our schema hasn't matched yet, wait until it has
+            // we do this by waiting for all in-flight migration requests and responses to
complete
             // (post CASSANDRA-1391 we don't expect this to be necessary very often, but
it doesn't hurt to be careful)
-            while (!MigrationManager.isReadyForBootstrap())
+            if (!MigrationManager.isReadyForBootstrap())
             {
                 setMode(Mode.JOINING, "waiting for schema information to complete", true);
-                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+                MigrationManager.waitUntilReadyForBootstrap();
             }
             setMode(Mode.JOINING, "schema complete, ready to bootstrap", true);
             setMode(Mode.JOINING, "waiting for pending range calculation", true);


Mime
View raw message