Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EB43818443 for ; Mon, 25 Jan 2016 09:13:16 +0000 (UTC) Received: (qmail 18475 invoked by uid 500); 25 Jan 2016 09:13:16 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 18439 invoked by uid 500); 25 Jan 2016 09:13:16 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 18403 invoked by uid 99); 25 Jan 2016 09:13:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 25 Jan 2016 09:13:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50BA0E012C; Mon, 25 Jan 2016 09:13:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Mon, 25 Jan 2016 09:13:16 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [01/10] cassandra git commit: maxPurgeableTimestamp needs to check memtables too Repository: cassandra Updated Branches: refs/heads/cassandra-2.2 b4d67c9e5 -> b5d6d4f72 refs/heads/cassandra-3.0 2f8e5f346 -> 442f4737c refs/heads/cassandra-3.3 acb7fed1d -> f2174280d refs/heads/trunk 72790dc8e -> 4abd99363 maxPurgeableTimestamp needs to check memtables too Patch by Stefania Alborghetti; reviewed by marcuse for CASSANDRA-9949 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b5d6d4f7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b5d6d4f7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b5d6d4f7 Branch: refs/heads/cassandra-2.2 Commit: b5d6d4f72299a0b08ce3279aade507e2a999acc6 Parents: b4d67c9 Author: Stefania Alborghetti Authored: Mon Jan 4 16:24:51 2016 +0100 Committer: Stefania Alborghetti Committed: Mon Jan 25 13:30:05 2016 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/AtomicBTreeColumns.java | 14 +- src/java/org/apache/cassandra/db/Memtable.java | 15 +- .../db/compaction/CompactionController.java | 21 +- .../db/compaction/LazilyCompactedRow.java | 2 +- .../db/compaction/CompactionControllerTest.java | 191 +++++++++++++++++++ 6 files changed, 232 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 59997ff..cdc3b34 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.5 + * maxPurgeableTimestamp needs to check memtables too (CASSANDRA-9949) * Apply change to compaction throughput in real time (CASSANDRA-10025) * Fix potential NPE on ORDER BY queries with IN (CASSANDRA-10955) * Avoid over-fetching during the page of range queries (CASSANDRA-8521) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java index 710b289..f5b7712 100644 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java @@ -198,7 +198,7 @@ public class AtomicBTreeColumns extends ColumnFamily * * @return the difference in size seen after merging the given columns */ - public Pair addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) + public ColumnUpdater addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) { ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer); DeletionInfo inputDeletionInfoCopy = null; @@ -237,7 +237,7 @@ public class AtomicBTreeColumns extends ColumnFamily { indexer.updateRowLevelIndexes(); updater.finish(); - return Pair.create(updater.dataSize, updater.colUpdateTimeDelta); + return updater; } else if (!monitorOwned) { @@ -429,7 +429,7 @@ public class AtomicBTreeColumns extends ColumnFamily } // the function we provide to the btree utilities to perform any column replacements - private static final class ColumnUpdater implements UpdateFunction + static final class ColumnUpdater implements UpdateFunction { final AtomicBTreeColumns updating; final CFMetaData metadata; @@ -442,6 +442,7 @@ public class AtomicBTreeColumns extends ColumnFamily long colUpdateTimeDelta = Long.MAX_VALUE; final MemtableAllocator.DataReclaimer reclaimer; List inserted; // TODO: replace with walk of aborted BTree + long minTimestamp = Long.MAX_VALUE; private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) { @@ -462,6 +463,7 @@ public class AtomicBTreeColumns extends ColumnFamily if (inserted == null) inserted = new ArrayList<>(); inserted.add(insert); + minTimestamp = Math.min(minTimestamp, insert.timestamp()); return insert; } @@ -469,6 +471,11 @@ public class AtomicBTreeColumns extends ColumnFamily { Cell reconciled = existing.reconcile(update); indexer.update(existing, reconciled); + // pick the smallest timestamp because we want to be consistent with the logic applied when inserting + // a cell in apply(Cell insert) above. For example given 3 timestamps where T3 < T2 < T1 then we want + // [apply(T1) -> apply(T2) -> apply(T3)] and [apply(T3) -> apply(T2) -> apply(T1)] to both return the + // smallest value T3, see CompactionControllerTest.testMaxPurgeableTimestamp() + minTimestamp = Math.min(minTimestamp, update.timestamp()); if (existing != reconciled) { reconciled = reconciled.localCopy(metadata, allocator, writeOp); @@ -495,6 +502,7 @@ public class AtomicBTreeColumns extends ColumnFamily inserted.clear(); } reclaimer.cancel(); + minTimestamp = Long.MAX_VALUE; } protected void abort(Cell abort) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index e96a71e..fb4da72 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -87,6 +87,9 @@ public class Memtable implements Comparable private final long creationTime = System.currentTimeMillis(); private final long creationNano = System.nanoTime(); + // The smallest timestamp for all partitions stored in this memtable + private long minTimestamp = Long.MAX_VALUE; + // Record the comparator of the CFS at the creation of the memtable. This // is only used when a user update the CF comparator, to know if the // memtable was created with the new or old comparator. @@ -224,10 +227,11 @@ public class Memtable implements Comparable } } - final Pair pair = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer); - liveDataSize.addAndGet(initialSize + pair.left); + final AtomicBTreeColumns.ColumnUpdater updater = previous.addAllWithSizeDelta(cf, allocator, opGroup, indexer); + minTimestamp = Math.min(minTimestamp, updater.minTimestamp); + liveDataSize.addAndGet(initialSize + updater.dataSize); currentOperations.addAndGet(cf.getColumnCount() + (cf.isMarkedForDelete() ? 1 : 0) + cf.deletionInfo().rangeCount()); - return pair.right; + return updater.colUpdateTimeDelta; } // for debugging @@ -316,6 +320,11 @@ public class Memtable implements Comparable return creationTime; } + public long getMinTimestamp() + { + return minTimestamp; + } + class FlushRunnable extends DiskAwareRunnable { private final ReplayPosition context; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index e0278c9..00d1344 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -23,10 +23,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.lifecycle.SSTableIntervalTree; -import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.utils.AlwaysPresentFilter; @@ -96,7 +96,7 @@ public class CompactionController implements AutoCloseable * Finds expired sstables * * works something like this; - * 1. find "global" minTimestamp of overlapping sstables and compacting sstables containing any non-expired data + * 1. find "global" minTimestamp of overlapping sstables, compacting sstables and memtables containing any non-expired data * 2. build a list of fully expired candidates * 3. check if the candidates to be dropped actually can be dropped (maxTimestamp < global minTimestamp) * - if not droppable, remove from candidates @@ -135,8 +135,11 @@ public class CompactionController implements AutoCloseable minTimestamp = Math.min(minTimestamp, candidate.getMinTimestamp()); } + for (Memtable memtable : cfStore.getTracker().getView().getAllMemtables()) + minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp()); + // At this point, minTimestamp denotes the lowest timestamp of any relevant - // SSTable that contains a constructive value. candidates contains all the + // SSTable or Memtable that contains a constructive value. candidates contains all the // candidates with no constructive values. The ones out of these that have // (getMaxTimestamp() < minTimestamp) serve no purpose anymore. @@ -171,7 +174,8 @@ public class CompactionController implements AutoCloseable * @return the largest timestamp before which it's okay to drop tombstones for the given partition; * i.e., after the maxPurgeableTimestamp there may exist newer data that still needs to be suppressed * in other sstables. This returns the minimum timestamp for any SSTable that contains this partition and is not - * participating in this compaction, or LONG.MAX_VALUE if no such SSTable exists. + * participating in this compaction, or memtable that contains this partition, + * or LONG.MAX_VALUE if no SSTable or memtable exist. */ public long maxPurgeableTimestamp(DecoratedKey key) { @@ -186,6 +190,13 @@ public class CompactionController implements AutoCloseable else if (sstable.getBloomFilter().isPresent(key)) min = Math.min(min, sstable.getMinTimestamp()); } + + for (Memtable memtable : cfs.getTracker().getView().getAllMemtables()) + { + ColumnFamily cf = memtable.getColumnFamily(key); + if (cf != null) + min = Math.min(min, memtable.getMinTimestamp()); + } return min; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 93505ae..ec82571 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -275,7 +275,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow RangeTombstone t = tombstone; tombstone = null; - if (t.timestamp() < getMaxPurgeableTimestamp() && t.data.isGcAble(controller.gcBefore)) + if (t.data.isGcAble(controller.gcBefore) && t.timestamp() < getMaxPurgeableTimestamp()) { indexBuilder.tombstoneTracker().update(t, true); return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b5d6d4f7/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java new file mode 100644 index 0000000..750a38e --- /dev/null +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionControllerTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compaction; + +import java.nio.ByteBuffer; +import java.util.Set; + +import com.google.common.collect.Sets; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.Util.cellname; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class CompactionControllerTest extends SchemaLoader +{ + private static final String KEYSPACE = "CompactionControllerTest"; + private static final String CF1 = "Standard1"; + private static final String CF2 = "Standard2"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE, CF1), + SchemaLoader.standardCFMD(KEYSPACE, CF2)); + } + + @Test + public void testMaxPurgeableTimestamp() + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF1); + cfs.truncateBlocking(); + + ByteBuffer rowKey = ByteBufferUtil.bytes("k1"); + DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(rowKey); + + long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp + long timestamp2 = timestamp1 - 5; + long timestamp3 = timestamp2 - 5; // oldest timestamp + + // add to first memtable + applyMutation(CF1, rowKey, timestamp1); + + // check max purgeable timestamp without any sstables + try(CompactionController controller = new CompactionController(cfs, null, 0)) + { + assertEquals(timestamp1, controller.maxPurgeableTimestamp(key)); //memtable only + + cfs.forceBlockingFlush(); + assertEquals(Long.MAX_VALUE, controller.maxPurgeableTimestamp(key)); //no memtables and no sstables + } + + Set compacting = Sets.newHashSet(cfs.getSSTables()); // first sstable is compacting + + // create another sstable + applyMutation(CF1, rowKey, timestamp2); + cfs.forceBlockingFlush(); + + // check max purgeable timestamp when compacting the first sstable with and without a memtable + try (CompactionController controller = new CompactionController(cfs, compacting, 0)) + { + assertEquals(timestamp2, controller.maxPurgeableTimestamp(key)); //second sstable only + + applyMutation(CF1, rowKey, timestamp3); + + assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //second sstable and second memtable + } + + // check max purgeable timestamp again without any sstables but with different insertion orders on the memtable + cfs.forceBlockingFlush(); + + //newest to oldest + try (CompactionController controller = new CompactionController(cfs, null, 0)) + { + applyMutation(CF1, rowKey, timestamp1); + applyMutation(CF1, rowKey, timestamp2); + applyMutation(CF1, rowKey, timestamp3); + + assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only + } + + cfs.forceBlockingFlush(); + + //oldest to newest + try (CompactionController controller = new CompactionController(cfs, null, 0)) + { + applyMutation(CF1, rowKey, timestamp3); + applyMutation(CF1, rowKey, timestamp2); + applyMutation(CF1, rowKey, timestamp1); + + assertEquals(timestamp3, controller.maxPurgeableTimestamp(key)); //memtable only + } + } + + @Test + public void testGetFullyExpiredSSTables() + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF2); + cfs.truncateBlocking(); + + ByteBuffer rowKey = ByteBufferUtil.bytes("k1"); + + long timestamp1 = FBUtilities.timestampMicros(); // latest timestamp + long timestamp2 = timestamp1 - 5; + long timestamp3 = timestamp2 - 5; // oldest timestamp + + // create sstable with tombstone that should be expired in no older timestamps + applyDeleteMutation(CF2, rowKey, timestamp2); + cfs.forceBlockingFlush(); + + // first sstable with tombstone is compacting + Set compacting = Sets.newHashSet(cfs.getSSTables()); + + // create another sstable with more recent timestamp + applyMutation(CF2, rowKey, timestamp1); + cfs.forceBlockingFlush(); + + // second sstable is overlapping + Set overlapping = Sets.difference(Sets.newHashSet(cfs.getSSTables()), compacting); + + // the first sstable should be expired because the overlapping sstable is newer and the gc period is later + int gcBefore = (int) (System.currentTimeMillis() / 1000) + 5; + Set expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore); + assertNotNull(expired); + assertEquals(1, expired.size()); + assertEquals(compacting.iterator().next(), expired.iterator().next()); + + // however if we add an older mutation to the memtable then the sstable should not be expired + applyMutation(CF2, rowKey, timestamp3); + expired = CompactionController.getFullyExpiredSSTables(cfs, compacting, overlapping, gcBefore); + assertNotNull(expired); + assertEquals(0, expired.size()); + } + + private void applyMutation(String cf, ByteBuffer rowKey, long timestamp) + { + CellName colName = cellname("birthdate"); + ByteBuffer val = ByteBufferUtil.bytes(1L); + + Mutation rm = new Mutation(KEYSPACE, rowKey); + rm.add(cf, colName, val, timestamp); + rm.applyUnsafe(); + } + + private void applyDeleteMutation(String cf, ByteBuffer rowKey, long timestamp) + { + Mutation rm = new Mutation(KEYSPACE, rowKey); + rm.delete(cf, timestamp); + rm.applyUnsafe(); + } + + + +}