cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] git commit: Force batchlog replay before decommissioning a node
Date Fri, 17 Oct 2014 00:40:30 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 014d328f4 -> 440824c1a


Force batchlog replay before decommissioning a node

patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-7446


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e916dff8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e916dff8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e916dff8

Branch: refs/heads/cassandra-2.1
Commit: e916dff8ba032d878ad4435eb7175c6a56f79ef4
Parents: 67db1bf
Author: Branimir Lambov <branimir.lambov@datastax.com>
Authored: Fri Oct 17 03:18:37 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Fri Oct 17 03:18:37 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    | 63 ++++++++++----------
 .../cassandra/service/StorageService.java       | 25 ++++++--
 .../cassandra/db/BatchlogManagerTest.java       |  8 +--
 4 files changed, 57 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd4b6bb..73aaab0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.11:
+ * Force batchlog replay before decommissioning a node (CASSANDRA-7446)
  * Fix hint replay with many accumulated expired hints (CASSANDRA-6998)
  * Fix duplicate results in DISTINCT queries on static columns with query
    paging (CASSANDRA-8108)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index b92c217..48f4c3c 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -25,7 +25,6 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -69,8 +68,8 @@ public class BatchlogManager implements BatchlogManagerMBean
     public static final BatchlogManager instance = new BatchlogManager();
 
     private final AtomicLong totalBatchesReplayed = new AtomicLong();
-    private final AtomicBoolean isReplaying = new AtomicBoolean();
 
+    // Single-thread executor service for scheduling and serializing log replay.
     public static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
 
     public void start()
@@ -108,6 +107,11 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public void forceBatchlogReplay()
     {
+        startBatchlogReplay();
+    }
+
+    public Future<?> startBatchlogReplay()
+    {
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws ExecutionException, InterruptedException
@@ -115,7 +119,8 @@ public class BatchlogManager implements BatchlogManagerMBean
                 replayAllFailedBatches();
             }
         };
-        batchlogTasks.execute(runnable);
+        // If a replay is already in progress this request will be executed after it completes.
+        return batchlogTasks.submit(runnable);
     }
 
     public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations,
UUID uuid)
@@ -156,12 +161,8 @@ public class BatchlogManager implements BatchlogManagerMBean
         return ByteBuffer.wrap(bos.toByteArray());
     }
 
-    @VisibleForTesting
-    void replayAllFailedBatches() throws ExecutionException, InterruptedException
+    private void replayAllFailedBatches() throws ExecutionException, InterruptedException
     {
-        if (!isReplaying.compareAndSet(false, true))
-            return;
-
         logger.debug("Started replayAllFailedBatches");
 
         // rate limit is in bytes per second. Uses Double.MAX_VALUE if disabled (set to 0
in cassandra.yaml).
@@ -169,34 +170,27 @@ public class BatchlogManager implements BatchlogManagerMBean
         int throttleInKB = DatabaseDescriptor.getBatchlogReplayThrottleInKB() / StorageService.instance.getTokenMetadata().getAllEndpoints().size();
         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE
: throttleInKB * 1024);
 
-        try
-        {
-            UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s
LIMIT %d",
-                                            Keyspace.SYSTEM_KS,
-                                            SystemKeyspace.BATCHLOG_CF,
-                                            PAGE_SIZE);
-
-            while (!page.isEmpty())
-            {
-                UUID id = processBatchlogPage(page, rateLimiter);
+        UntypedResultSet page = process("SELECT id, data, written_at, version FROM %s.%s
LIMIT %d",
+                                        Keyspace.SYSTEM_KS,
+                                        SystemKeyspace.BATCHLOG_CF,
+                                        PAGE_SIZE);
 
-                if (page.size() < PAGE_SIZE)
-                    break; // we've exhausted the batchlog, next query would be empty.
+        while (!page.isEmpty())
+        {
+            UUID id = processBatchlogPage(page, rateLimiter);
 
-                page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id)
> token(%s) LIMIT %d",
-                               Keyspace.SYSTEM_KS,
-                               SystemKeyspace.BATCHLOG_CF,
-                               id,
-                               PAGE_SIZE);
-            }
+            if (page.size() < PAGE_SIZE)
+                break; // we've exhausted the batchlog, next query would be empty.
 
-            cleanup();
-        }
-        finally
-        {
-            isReplaying.set(false);
+            page = process("SELECT id, data, written_at, version FROM %s.%s WHERE token(id)
> token(%s) LIMIT %d",
+                           Keyspace.SYSTEM_KS,
+                           SystemKeyspace.BATCHLOG_CF,
+                           id,
+                           PAGE_SIZE);
         }
 
+        cleanup();
+
         logger.debug("Finished replayAllFailedBatches");
     }
 
@@ -210,7 +204,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             long writtenAt = row.getLong("written_at");
             int version = row.has("version") ? row.getInt("version") : MessagingService.VERSION_12;
             // enough time for the actual write + batchlog entry mutation delivery (two separate
requests).
-            long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for
the actual write + BM removal mutation
+            long timeout = getBatchlogTimeout();
             if (System.currentTimeMillis() < writtenAt + timeout)
                 continue; // not ready to replay yet, might still get a deletion.
             replayBatch(id, row.getBytes("data"), writtenAt, version, rateLimiter);
@@ -218,6 +212,11 @@ public class BatchlogManager implements BatchlogManagerMBean
         return id;
     }
 
+    public long getBatchlogTimeout()
+    {
+        return DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for the actual
write + BM removal mutation
+    }
+
     private void replayBatch(UUID id, ByteBuffer data, long writtenAt, int version, RateLimiter
rateLimiter)
     {
         logger.debug("Replaying batch {}", id);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index cca6f79..56056ab 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2914,8 +2914,9 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         if (logger.isDebugEnabled())
             logger.debug("DECOMMISSIONING");
         startLeaving();
-        setMode(Mode.LEAVING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
-        Thread.sleep(RING_DELAY);
+        long timeout = Math.max(RING_DELAY, BatchlogManager.instance.getBatchlogTimeout());
+        setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch processing and pending
range setup", true);
+        Thread.sleep(timeout);
 
         Runnable finishLeaving = new Runnable()
         {
@@ -2958,13 +2959,29 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
             rangesToStream.put(keyspaceName, rangesMM);
         }
 
-        setMode(Mode.LEAVING, "streaming data to other nodes", true);
+        setMode(Mode.LEAVING, "replaying batch log and streaming data to other nodes", true);
 
+        // Start with BatchLog replay, which may create hints but no writes since this is
no longer a valid endpoint.
+        Future<?> batchlogReplay = BatchlogManager.instance.startBatchlogReplay();
         Future<StreamState> streamSuccess = streamRanges(rangesToStream);
+
+        // Wait for batch log to complete before streaming hints.
+        logger.debug("waiting for batch log processing.");
+        try
+        {
+            batchlogReplay.get();
+        }
+        catch (ExecutionException | InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        setMode(Mode.LEAVING, "streaming hints to other nodes", true);
+
         Future<StreamState> hintsSuccess = streamHints();
 
         // wait for the transfer runnables to signal the latch.
-        logger.debug("waiting for stream aks.");
+        logger.debug("waiting for stream acks.");
         try
         {
             streamSuccess.get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e916dff8/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
index 0b6a908..846b008 100644
--- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -77,8 +77,8 @@ public class BatchlogManagerTest extends SchemaLoader
         assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches);
         assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches);
 
-        // Force batchlog replay.
-        BatchlogManager.instance.replayAllFailedBatches();
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.startBatchlogReplay().get();
 
         // Ensure that the first half, and only the first half, got replayed.
         assertEquals(500, BatchlogManager.instance.countAllBatches() - initialAllBatches);
@@ -138,8 +138,8 @@ public class BatchlogManagerTest extends SchemaLoader
         // Flush the batchlog to disk (see CASSANDRA-6822).
         Keyspace.open(Keyspace.SYSTEM_KS).getColumnFamilyStore(SystemKeyspace.BATCHLOG_CF).forceFlush();
 
-        // Force batchlog replay.
-        BatchlogManager.instance.replayAllFailedBatches();
+        // Force batchlog replay and wait for it to complete.
+        BatchlogManager.instance.startBatchlogReplay().get();
 
         // We should see half of Standard2-targeted mutations written after the replay and
all of Standard3 mutations applied.
         for (int i = 0; i < 1000; i++)


Mime
View raw message