cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject git commit: optimize batchlog flushing to skip successful batches patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4667
Date Wed, 03 Oct 2012 21:31:00 GMT
Updated Branches:
  refs/heads/trunk dbcf00df8 -> 637af9d78


optimize batchlog flushing to skip successful batches
patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4667


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/637af9d7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/637af9d7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/637af9d7

Branch: refs/heads/trunk
Commit: 637af9d78272d4c849d865a5cfe4b041ba86f35e
Parents: dbcf00d
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Wed Oct 3 12:47:45 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Wed Oct 3 16:30:13 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../org/apache/cassandra/config/CFMetaData.java    |    3 +-
 .../org/apache/cassandra/db/BatchlogManager.java   |   31 ++++++++++++---
 src/java/org/apache/cassandra/db/Memtable.java     |    8 ++++
 4 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3fa5997..01e03d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2-beta2
+ * optimize batchlog flushing to skip successful batches (CASSANDRA-4667)
  * include metadata for system keyspace itself in schema tables (CASSANDRA-4416)
  * add check to PropertyFileSnitch to verify presence of location for
    local node (CASSANDRA-4728)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 8aa4422..6abeb33 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -198,7 +198,8 @@ public final class CFMetaData
                                                             + "id uuid PRIMARY KEY,"
                                                             + "written_at timestamp,"
                                                             + "data blob"
-                                                            + ") WITH COMMENT='uncommited
batches' AND gc_grace_seconds=0");
+                                                            + ") WITH COMMENT='uncommited
batches' AND gc_grace_seconds=0 "
+                                                            + "AND COMPACTION={'class' :
'SizeTieredCompactionStrategy', 'min_threshold' : 2}");
 
     public static final CFMetaData RangeXfersCf = compile(17, "CREATE TABLE " + SystemTable.RANGE_XFERS_CF
+ " ("
                                                               + "token_bytes blob PRIMARY
KEY,"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index f732ece..e2a9c0d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.filter.NamesQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -46,12 +48,15 @@ import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
 
 public class BatchlogManager implements BatchlogManagerMBean
 {
@@ -81,9 +86,9 @@ public class BatchlogManager implements BatchlogManagerMBean
             throw new RuntimeException(e);
         }
 
-        Runnable runnable = new Runnable()
+        Runnable runnable = new WrappedRunnable()
         {
-            public void run()
+            public void runMayThrow() throws ExecutionException, InterruptedException
             {
                 replayAllFailedBatches();
             }
@@ -114,9 +119,9 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     public void forceBatchlogReplay()
     {
-        Runnable runnable = new Runnable()
+        Runnable runnable = new WrappedRunnable()
         {
-            public void run()
+            public void runMayThrow() throws ExecutionException, InterruptedException
             {
                 replayAllFailedBatches();
             }
@@ -158,7 +163,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         return ByteBuffer.wrap(bos.toByteArray());
     }
 
-    private void replayAllFailedBatches()
+    private void replayAllFailedBatches() throws ExecutionException, InterruptedException
     {
         if (!isReplaying.compareAndSet(false, true))
             return;
@@ -176,6 +181,8 @@ public class BatchlogManager implements BatchlogManagerMBean
                 if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value())
+ TIMEOUT)
                     replayBatch(row.key);
             }
+
+            cleanup();
         }
         finally
         {
@@ -192,7 +199,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         logger.debug("Replaying batch {}", uuid);
 
         ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
-        QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(SystemTable.BATCHLOG_CF),
DATA);
+        QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath(SystemTable.BATCHLOG_CF));
         ColumnFamily batch = store.getColumnFamily(filter);
 
         if (batch == null || batch.isMarkedForDelete())
@@ -257,4 +264,16 @@ public class BatchlogManager implements BatchlogManagerMBean
         AbstractBounds<RowPosition> range = new Range<RowPosition>(minPosition,
minPosition, partitioner);
         return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null);
     }
+
+    /** force flush + compaction to reclaim space from replayed batches */
+    private void cleanup() throws ExecutionException, InterruptedException
+    {
+        ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF);
+        cfs.forceBlockingFlush();
+        Collection<Descriptor> descriptors = new ArrayList<Descriptor>();
+        for (SSTableReader sstr : cfs.getSSTables())
+            descriptors.add(sstr.descriptor);
+        if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact.
+            CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 4424811..4631690 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -440,6 +440,14 @@ public class Memtable
                     ColumnFamily cf = entry.getValue();
                     if (cf.isMarkedForDelete())
                     {
+                        // When every node is up, there's no reason to write batchlog data
out to sstables
+                        // (which in turn incurs cost like compaction) since the BL write
+ delete cancel each other out,
+                        // and BL data is strictly local, so we don't need to preserve tombstones
for repair.
+                        // If we have a data row + row level tombstone, then writing it is
effectively an expensive no-op so we skip it.
+                        // See CASSANDRA-4667.
+                        if (cfs.columnFamily.equals(SystemTable.BATCHLOG_CF) && cfs.table.name.equals(Table.SYSTEM_KS)
&& !cf.isEmpty())
+                            continue;
+
                         // Pedantically, you could purge column level tombstones that are
past GcGRace when writing to the SSTable.
                         // But it can result in unexpected behaviour where deletes never
make it to disk,
                         // as they are lost and so cannot override existing column values.
So we only remove deleted columns if there


Mime
View raw message