Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AA16AF231 for ; Fri, 12 Dec 2014 13:40:51 +0000 (UTC) Received: (qmail 64195 invoked by uid 500); 12 Dec 2014 13:40:51 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 64163 invoked by uid 500); 12 Dec 2014 13:40:51 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 64152 invoked by uid 99); 12 Dec 2014 13:40:51 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Dec 2014 13:40:51 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3D553A293A8; Fri, 12 Dec 2014 13:40:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: cassandra git commit: Ensure memtable flush cannot expire commit log entries from its future Date: Fri, 12 Dec 2014 13:40:51 +0000 (UTC) Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 5c6958462 -> 7e3f6151a Ensure memtable flush cannot expire commit log entries from its future patch by benedict; reviewed by aweisburg for CASSANDRA-8383 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e3f6151 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e3f6151 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e3f6151 Branch: refs/heads/cassandra-2.1 Commit: 7e3f6151abc96ceb1a2cac1bc117324c4de630e9 Parents: 5c69584 Author: Benedict Elliott Smith Authored: Fri Dec 12 13:20:19 2014 +0000 Committer: Benedict Elliott Smith Committed: Fri Dec 12 13:20:19 2014 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 34 ++++++++--- .../org/apache/cassandra/db/DataTracker.java | 5 +- src/java/org/apache/cassandra/db/Memtable.java | 62 ++++++++++++-------- .../cassandra/db/commitlog/CommitLog.java | 4 +- 5 files changed, 69 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b4cb6fb..18efc7e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.3 + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383) * Make read "defrag" async to reclaim memtables (CASSANDRA-8459) * Remove tmplink files for offline compactions (CASSANDRA-8321) * Reduce maxHintsInProgress (CASSANDRA-8415) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 99940b7..08f7969 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import javax.management.*; @@ -910,12 +911,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean final boolean flushSecondaryIndexes; final OpOrder.Barrier writeBarrier; final CountDownLatch latch = new CountDownLatch(1); - volatile ReplayPosition lastReplayPosition; + final ReplayPosition lastReplayPosition; - private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier) + private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition lastReplayPosition) { this.writeBarrier = writeBarrier; this.flushSecondaryIndexes = flushSecondaryIndexes; + this.lastReplayPosition = lastReplayPosition; } public void run() @@ -995,19 +997,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean memtables = new ArrayList<>(); // submit flushes for the memtable for any indexed sub-cfses, and our own - final ReplayPosition minReplayPosition = CommitLog.instance.getContext(); + AtomicReference lastReplayPositionHolder = new AtomicReference<>(); for (ColumnFamilyStore cfs : concatWithIndexes()) { // switch all memtables, regardless of their dirty status, setting the barrier // so that we can reach a coordinated decision about cleanliness once they // are no longer possible to be modified Memtable mt = cfs.data.switchMemtable(truncate); - mt.setDiscarding(writeBarrier, minReplayPosition); + mt.setDiscarding(writeBarrier, lastReplayPositionHolder); memtables.add(mt); } + // we now attempt to define the lastReplayPosition; we do this by grabbing the current limit from the CL + // and attempting to set the holder to this value. at the same time all writes to the memtables are + // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry, + // so that we know all operations prior to the position have not reached it yet + ReplayPosition lastReplayPosition; + while (true) + { + lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext()); + ReplayPosition currentLast = lastReplayPositionHolder.get(); + if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0) + && lastReplayPositionHolder.compareAndSet(currentLast, lastReplayPosition)) + break; + } + + // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete; + // since this happens after wiring up the lastReplayPosition, we also know all operations with earlier + // replay positions have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); - postFlush = new PostFlush(!truncate, writeBarrier); + postFlush = new PostFlush(!truncate, writeBarrier, lastReplayPosition); } public void run() @@ -1059,7 +1078,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } // signal the post-flush we've done our work - postFlush.lastReplayPosition = memtables.get(0).getLastReplayPosition(); postFlush.latch.countDown(); } } @@ -1131,8 +1149,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { long start = System.nanoTime(); - Memtable mt = data.getMemtableFor(opGroup); - final long timeDelta = mt.put(key, columnFamily, indexer, opGroup, replayPosition); + Memtable mt = data.getMemtableFor(opGroup, replayPosition); + final long timeDelta = mt.put(key, columnFamily, indexer, opGroup); maybeUpdateRowCache(key); metric.writeLatency.addNano(System.nanoTime() - start); if(timeDelta < Long.MAX_VALUE) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index 7df2b75..d086b47 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.common.base.Predicate; import com.google.common.collect.*; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +55,7 @@ public class DataTracker } // get the Memtable that the ordered writeOp should be directed to - public Memtable getMemtableFor(OpOrder.Group opGroup) + public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition) { // since any new memtables appended to the list after we fetch it will be for operations started // after us, we can safely assume that we will always find the memtable that 'accepts' us; @@ -65,7 +66,7 @@ public class DataTracker // assign operations to a memtable that was retired/queued before we started) for (Memtable memtable : view.get().liveMemtables) { - if (memtable.accepts(opGroup)) + if (memtable.accepts(opGroup, replayPosition)) return memtable; } throw new AssertionError(view.get().liveMemtables.toString()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 3ae5da4..eb04bea 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -61,10 +61,17 @@ public class Memtable // the write barrier for directing writes to this memtable during a switch private volatile OpOrder.Barrier writeBarrier; // the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned by this or an earlier Memtable - private final AtomicReference lastReplayPosition = new AtomicReference<>(); + private volatile AtomicReference lastReplayPosition; // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used as a convenience to prevent CLSM flushing wantonly private final ReplayPosition minReplayPosition = CommitLog.instance.getContext(); + public static final class LastReplayPosition extends ReplayPosition + { + public LastReplayPosition(ReplayPosition copy) { + super(copy.segment, copy.position); + } + } + // We index the memtable by RowPosition only for the purpose of being able // to select key range using Token.KeyBound. However put() ensures that we // actually only store DecoratedKey. @@ -101,10 +108,10 @@ public class Memtable return currentOperations.get(); } - void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition minLastReplayPosition) + void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference lastReplayPosition) { assert this.writeBarrier == null; - this.lastReplayPosition.set(minLastReplayPosition); + this.lastReplayPosition = lastReplayPosition; this.writeBarrier = writeBarrier; allocator.setDiscarding(); } @@ -114,10 +121,34 @@ public class Memtable allocator.setDiscarded(); } - public boolean accepts(OpOrder.Group opGroup) + // decide if this memtable should take the write, or if it should go to the next memtable + public boolean accepts(OpOrder.Group opGroup, ReplayPosition replayPosition) { + // if the barrier hasn't been set yet, then this memtable is still taking ALL writes OpOrder.Barrier barrier = this.writeBarrier; - return barrier == null || barrier.isAfter(opGroup); + if (barrier == null) + return true; + // if the barrier has been set, but is in the past, we are definitely destined for a future memtable + if (!barrier.isAfter(opGroup)) + return false; + // if we aren't durable we are directed only by the barrier + if (replayPosition == null) + return true; + while (true) + { + // otherwise we check if we are in the past/future wrt the CL boundary; + // if the boundary hasn't been finalised yet, we simply update it to the max of + // its current value and ours; if it HAS been finalised, we simply accept its judgement + // this permits us to coordinate a safe boundary, as the boundary choice is made + // atomically wrt our max() maintenance, so an operation cannot sneak into the past + ReplayPosition currentLast = lastReplayPosition.get(); + if (currentLast instanceof LastReplayPosition) + return currentLast.compareTo(replayPosition) >= 0; + if (currentLast != null && currentLast.compareTo(replayPosition) >= 0) + return true; + if (lastReplayPosition.compareAndSet(currentLast, replayPosition)) + return true; + } } public boolean isLive() @@ -150,22 +181,8 @@ public class Memtable * * replayPosition should only be null if this is a secondary index, in which case it is *expected* to be null */ - long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup, ReplayPosition replayPosition) + long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group opGroup) { - if (replayPosition != null && writeBarrier != null) - { - // if the writeBarrier is set, we want to maintain lastReplayPosition; this is an optimisation to avoid - // casing it for every write, but still ensure it is correct when writeBarrier.await() completes. - while (true) - { - ReplayPosition last = lastReplayPosition.get(); - if (last.compareTo(replayPosition) >= 0) - break; - if (lastReplayPosition.compareAndSet(last, replayPosition)) - break; - } - } - AtomicBTreeColumns previous = rows.get(key); if (previous == null) @@ -274,11 +291,6 @@ public class Memtable return creationTime; } - public ReplayPosition getLastReplayPosition() - { - return lastReplayPosition.get(); - } - class FlushRunnable extends DiskAwareRunnable { private final ReplayPosition context; http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index ee9ca14..9b51a33 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -158,8 +158,8 @@ public class CommitLog implements CommitLogMBean } /** - * @return a Future representing a ReplayPosition such that when it is ready, - * all Allocations created prior to the getContext call will be written to the log + * @return a ReplayPosition which, if >= one returned from add(), implies add() was started + * (but not necessarily finished) prior to this call */ public ReplayPosition getContext() {