Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 12AB7D922 for ; Wed, 3 Oct 2012 21:31:01 +0000 (UTC) Received: (qmail 66751 invoked by uid 500); 3 Oct 2012 21:31:00 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 66714 invoked by uid 500); 3 Oct 2012 21:31:00 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 66701 invoked by uid 99); 3 Oct 2012 21:31:00 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Oct 2012 21:31:00 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 97E3F37CA6; Wed, 3 Oct 2012 21:31:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: git commit: optimize batchlog flushing to skip successful batches patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4667 Message-Id: <20121003213100.97E3F37CA6@tyr.zones.apache.org> Date: Wed, 3 Oct 2012 21:31:00 +0000 (UTC) Updated Branches: refs/heads/trunk dbcf00df8 -> 637af9d78 optimize batchlog flushing to skip successful batches patch by Aleksey Yeschenko; reviewed by jbellis for CASSANDRA-4667 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/637af9d7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/637af9d7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/637af9d7 Branch: refs/heads/trunk Commit: 637af9d78272d4c849d865a5cfe4b041ba86f35e Parents: dbcf00d Author: Jonathan Ellis Authored: Wed Oct 3 12:47:45 2012 -0500 Committer: Jonathan Ellis Committed: Wed Oct 3 16:30:13 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 3 +- .../org/apache/cassandra/db/BatchlogManager.java | 31 ++++++++++++--- src/java/org/apache/cassandra/db/Memtable.java | 8 ++++ 4 files changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 3fa5997..01e03d7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2-beta2 + * optimize batchlog flushing to skip successful batches (CASSANDRA-4667) * include metadata for system keyspace itself in schema tables (CASSANDRA-4416) * add check to PropertyFileSnitch to verify presence of location for local node (CASSANDRA-4728) http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 8aa4422..6abeb33 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -198,7 +198,8 @@ public final class CFMetaData + "id uuid PRIMARY KEY," + "written_at timestamp," + "data blob" - + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0"); + + ") WITH COMMENT='uncommited batches' AND gc_grace_seconds=0 " + + "AND COMPACTION={'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 2}"); public static final CFMetaData RangeXfersCf = compile(17, "CREATE TABLE " + SystemTable.RANGE_XFERS_CF + " (" + "token_bytes blob PRIMARY KEY," http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index f732ece..e2a9c0d 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -35,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.NamesQueryFilter; import org.apache.cassandra.db.filter.QueryFilter; @@ -46,12 +48,15 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; public class BatchlogManager implements BatchlogManagerMBean { @@ -81,9 +86,9 @@ public class BatchlogManager implements BatchlogManagerMBean throw new RuntimeException(e); } - Runnable runnable = new Runnable() + Runnable runnable = new WrappedRunnable() { - public void run() + public void runMayThrow() throws ExecutionException, InterruptedException { replayAllFailedBatches(); } @@ -114,9 +119,9 @@ public class BatchlogManager implements BatchlogManagerMBean public void forceBatchlogReplay() { - Runnable runnable = new Runnable() + Runnable runnable = new WrappedRunnable() { - public void run() + public void runMayThrow() throws ExecutionException, InterruptedException { replayAllFailedBatches(); } @@ -158,7 +163,7 @@ public class BatchlogManager implements BatchlogManagerMBean return ByteBuffer.wrap(bos.toByteArray()); } - private void replayAllFailedBatches() + private void replayAllFailedBatches() throws ExecutionException, InterruptedException { if (!isReplaying.compareAndSet(false, true)) return; @@ -176,6 +181,8 @@ public class BatchlogManager implements BatchlogManagerMBean if (writtenAt == null || System.currentTimeMillis() > LongType.instance.compose(writtenAt.value()) + TIMEOUT) replayBatch(row.key); } + + cleanup(); } finally { @@ -192,7 +199,7 @@ public class BatchlogManager implements BatchlogManagerMBean logger.debug("Replaying batch {}", uuid); ColumnFamilyStore store = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF); - QueryFilter filter = QueryFilter.getNamesFilter(key, new QueryPath(SystemTable.BATCHLOG_CF), DATA); + QueryFilter filter = QueryFilter.getIdentityFilter(key, new QueryPath(SystemTable.BATCHLOG_CF)); ColumnFamily batch = store.getColumnFamily(filter); if (batch == null || batch.isMarkedForDelete()) @@ -257,4 +264,16 @@ public class BatchlogManager implements BatchlogManagerMBean AbstractBounds range = new Range(minPosition, minPosition, partitioner); return store.getRangeSlice(null, range, Integer.MAX_VALUE, columnFilter, null); } + + /** force flush + compaction to reclaim space from replayed batches */ + private void cleanup() throws ExecutionException, InterruptedException + { + ColumnFamilyStore cfs = Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF); + cfs.forceBlockingFlush(); + Collection descriptors = new ArrayList(); + for (SSTableReader sstr : cfs.getSSTables()) + descriptors.add(sstr.descriptor); + if (!descriptors.isEmpty()) // don't pollute the logs if there is nothing to compact. + CompactionManager.instance.submitUserDefined(cfs, descriptors, Integer.MAX_VALUE).get(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/637af9d7/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 4424811..4631690 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -440,6 +440,14 @@ public class Memtable ColumnFamily cf = entry.getValue(); if (cf.isMarkedForDelete()) { + // When every node is up, there's no reason to write batchlog data out to sstables + // (which in turn incurs cost like compaction) since the BL write + delete cancel each other out, + // and BL data is strictly local, so we don't need to preserve tombstones for repair. + // If we have a data row + row level tombstone, then writing it is effectively an expensive no-op so we skip it. + // See CASSANDRA-4667. + if (cfs.columnFamily.equals(SystemTable.BATCHLOG_CF) && cfs.table.name.equals(Table.SYSTEM_KS) && !cf.isEmpty()) + continue; + // Pedantically, you could purge column level tombstones that are past GcGRace when writing to the SSTable. // But it can result in unexpected behaviour where deletes never make it to disk, // as they are lost and so cannot override existing column values. So we only remove deleted columns if there