cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] git commit: Paginate batchlog replay
Date Fri, 17 Jan 2014 16:41:02 GMT
Updated Branches:
  refs/heads/cassandra-2.0 14c6f7030 -> 881462369


Paginate batchlog replay

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6569


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0b168f0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0b168f0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0b168f0

Branch: refs/heads/cassandra-2.0
Commit: b0b168f0690f7e1d2c0e3401ea0e74d3bbccd164
Parents: a3f7035
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Fri Jan 17 19:24:49 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Fri Jan 17 19:24:49 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/BatchlogManager.java    | 78 +++++++++++++++----
 .../cassandra/db/BatchlogManagerTest.java       | 82 ++++++++++++++++++++
 3 files changed, 144 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a5827d3..f550863 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,7 @@
  * Fix executing LOCAL_QUORUM with SimpleStrategy (CASSANDRA-6545)
  * Avoid StackOverflow when using large IN queries (CASSANDRA-6567)
  * Nodetool upgradesstables includes secondary indexes (CASSANDRA-6589)
+ * Paginate batchlog replay (CASSANDRA-6569)
 
 
 1.2.13

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 1af4909..90dfd47 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.RateLimiter;
@@ -45,6 +46,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.marshal.UUIDType;
@@ -66,8 +68,8 @@ public class BatchlogManager implements BatchlogManagerMBean
 {
     private static final String MBEAN_NAME = "org.apache.cassandra.db:type=BatchlogManager";
     private static final int VERSION = MessagingService.VERSION_12;
-    private static final long TIMEOUT = 2 * DatabaseDescriptor.getWriteRpcTimeout();
     private static final long REPLAY_INTERVAL = 60 * 1000; // milliseconds
+    private static final int PAGE_SIZE = 128; // same as HHOM, for now, w/out using any heuristics.
TODO: set based on avg batch size.
 
     private static final Logger logger = LoggerFactory.getLogger(BatchlogManager.class);
     public static final BatchlogManager instance = new BatchlogManager();
@@ -124,14 +126,19 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations,
UUID uuid)
     {
-        long timestamp = FBUtilities.timestampMicros();
-        ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
+        return getBatchlogMutationFor(mutations, uuid, FBUtilities.timestampMicros());
+    }
+
+    @VisibleForTesting
+    static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID
uuid, long now)
+    {
+        ByteBuffer writtenAt = LongType.instance.decompose(now / 1000);
         ByteBuffer data = serializeRowMutations(mutations);
 
         ColumnFamily cf = ColumnFamily.create(CFMetaData.BatchlogCf);
-        cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
-        cf.addColumn(new Column(columnName("written_at"), writtenAt, timestamp));
-        cf.addColumn(new Column(columnName("data"), data, timestamp));
+        cf.addColumn(new Column(columnName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, now));
+        cf.addColumn(new Column(columnName("written_at"), writtenAt, now));
+        cf.addColumn(new Column(columnName("data"), data, now));
         RowMutation rm = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(uuid));
         rm.add(cf);
 
@@ -157,7 +164,8 @@ public class BatchlogManager implements BatchlogManagerMBean
         return ByteBuffer.wrap(bos.toByteArray());
     }
 
-    private void replayAllFailedBatches() throws ExecutionException, InterruptedException
+    @VisibleForTesting
+    void replayAllFailedBatches() throws ExecutionException, InterruptedException
     {
         if (!isReplaying.compareAndSet(false, true))
             return;
@@ -171,9 +179,25 @@ public class BatchlogManager implements BatchlogManagerMBean
 
         try
         {
-            for (UntypedResultSet.Row row : process("SELECT id, written_at FROM %s.%s", Table.SYSTEM_KS,
SystemTable.BATCHLOG_CF))
-                if (System.currentTimeMillis() > row.getLong("written_at") + TIMEOUT)
-                    replayBatch(row.getUUID("id"), rateLimiter);
+            UntypedResultSet page = process("SELECT id, data, written_at FROM %s.%s LIMIT
%d",
+                                            Table.SYSTEM_KS,
+                                            SystemTable.BATCHLOG_CF,
+                                            PAGE_SIZE);
+
+            while (!page.isEmpty())
+            {
+                UUID id = processBatchlogPage(page, rateLimiter);
+
+                if (page.size() < PAGE_SIZE)
+                    break; // we've exhausted the batchlog, next query would be empty.
+
+                page = process("SELECT id, data, written_at FROM %s.%s WHERE token(id) >
token(%s) LIMIT %d",
+                               Table.SYSTEM_KS,
+                               SystemTable.BATCHLOG_CF,
+                               id,
+                               PAGE_SIZE);
+            }
+
             cleanup();
         }
         finally
@@ -184,28 +208,48 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Finished replayAllFailedBatches");
     }
 
-    private void replayBatch(UUID id, RateLimiter rateLimiter)
+    // returns the UUID of the last seen batch
+    private UUID processBatchlogPage(UntypedResultSet page, RateLimiter rateLimiter)
     {
-        logger.debug("Replaying batch {}", id);
+        UUID id = null;
+        for (UntypedResultSet.Row row : page)
+        {
+            id = row.getUUID("id");
+            long writtenAt = row.getLong("written_at");
+            // enough time for the actual write + batchlog entry mutation delivery (two separate
requests).
+            long timeout = DatabaseDescriptor.getWriteRpcTimeout() * 2; // enough time for
the actual write + BM removal mutation
+            if (System.currentTimeMillis() < writtenAt + timeout)
+                continue; // not ready to replay yet, might still get a deletion.
+            replayBatch(id, row.getBytes("data"), writtenAt, rateLimiter);
+        }
+        return id;
+    }
 
-        UntypedResultSet result = process("SELECT written_at, data FROM %s.%s WHERE id =
%s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF, id);
-        if (result.isEmpty())
-            return;
+    private void replayBatch(UUID id, ByteBuffer data, long writtenAt, RateLimiter rateLimiter)
+    {
+        logger.debug("Replaying batch {}", id);
 
         try
         {
-            replaySerializedMutations(result.one().getBytes("data"), result.one().getLong("written_at"),
rateLimiter);
+            replaySerializedMutations(data, writtenAt, rateLimiter);
         }
         catch (IOException e)
         {
             logger.warn("Skipped batch replay of {} due to {}", id, e);
         }
 
-        process("DELETE FROM %s.%s WHERE id = %s", Table.SYSTEM_KS, SystemTable.BATCHLOG_CF,
id);
+        deleteBatch(id);
 
         totalBatchesReplayed.incrementAndGet();
     }
 
+    private void deleteBatch(UUID id)
+    {
+        RowMutation mutation = new RowMutation(Table.SYSTEM_KS, UUIDType.instance.decompose(id));
+        mutation.delete(new QueryPath(SystemTable.BATCHLOG_CF, null, null), System.currentTimeMillis());
+        mutation.apply();
+    }
+
     private void replaySerializedMutations(ByteBuffer data, long writtenAt, RateLimiter rateLimiter)
throws IOException
     {
         DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0b168f0/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
new file mode 100644
index 0000000..637815b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java
@@ -0,0 +1,82 @@
+package org.apache.cassandra.db;
+
+import java.net.InetAddress;
+import java.util.Collections;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+
+public class BatchlogManagerTest extends SchemaLoader
+{
+    @Before
+    public void setUp() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        InetAddress localhost = InetAddress.getByName("127.0.0.1");
+        metadata.updateNormalToken(Util.token("A"), localhost);
+        metadata.updateHostId(UUIDGen.getTimeUUID(), localhost);
+    }
+
+    @Test
+    public void testReplay() throws Exception
+    {
+        assertEquals(0, BatchlogManager.instance.countAllBatches());
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+        // Generate 1000 mutations and put them all into the batchlog.
+        // Half (500) ready to be replayed, half not.
+        for (int i = 0; i < 1000; i++)
+        {
+            RowMutation mutation = new RowMutation("Keyspace1", bytes(i));
+            mutation.add(new QueryPath("Standard1", null, bytes(i)), bytes(i), 0);
+            long timestamp = System.currentTimeMillis();
+            if (i < 500)
+                timestamp -= DatabaseDescriptor.getWriteRpcTimeout() * 2;
+            BatchlogManager.getBatchlogMutationFor(Collections.singleton(mutation), UUIDGen.getTimeUUID(),
timestamp * 1000).apply();
+        }
+
+        assertEquals(1000, BatchlogManager.instance.countAllBatches());
+        assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed());
+
+        // Force batchlog replay.
+        BatchlogManager.instance.replayAllFailedBatches();
+
+        // Ensure that the first half, and only the first half, got replayed.
+        assertEquals(500, BatchlogManager.instance.countAllBatches());
+        assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed());
+
+        for (int i = 0; i < 1000; i++)
+        {
+            UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT
* FROM \"Keyspace1\".\"Standard1\" WHERE key = intAsBlob(%d)", i));
+            if (i < 500)
+            {
+                assertEquals(bytes(i), result.one().getBytes("key"));
+                assertEquals(bytes(i), result.one().getBytes("column1"));
+                assertEquals(bytes(i), result.one().getBytes("value"));
+            }
+            else
+            {
+                assertTrue(result.isEmpty());
+            }
+        }
+
+        // Ensure that no stray mutations got somehow applied.
+        UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*)
FROM \"Keyspace1\".\"Standard1\""));
+        assertEquals(500, result.one().getLong("count"));
+    }
+}


Mime
View raw message