cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [1/2] cassandra git commit: Ensure memtable flush cannot expire commit log entries from its future
Date Fri, 12 Dec 2014 13:22:34 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk f35e9c255 -> 480cd39c2


Ensure memtable flush cannot expire commit log entries from its future

patch by benedict; reviewed by aweisburg for CASSANDRA-8383


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e3f6151
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e3f6151
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e3f6151

Branch: refs/heads/trunk
Commit: 7e3f6151abc96ceb1a2cac1bc117324c4de630e9
Parents: 5c69584
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Fri Dec 12 13:20:19 2014 +0000
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Fri Dec 12 13:20:19 2014 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 34 ++++++++---
 .../org/apache/cassandra/db/DataTracker.java    |  5 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 62 ++++++++++++--------
 .../cassandra/db/commitlog/CommitLog.java       |  4 +-
 5 files changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4cb6fb..18efc7e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383)
  * Make read "defrag" async to reclaim memtables (CASSANDRA-8459)
  * Remove tmplink files for offline compactions (CASSANDRA-8321)
  * Reduce maxHintsInProgress (CASSANDRA-8415)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 99940b7..08f7969 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import javax.management.*;
 
@@ -910,12 +911,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         final boolean flushSecondaryIndexes;
         final OpOrder.Barrier writeBarrier;
         final CountDownLatch latch = new CountDownLatch(1);
-        volatile ReplayPosition lastReplayPosition;
+        final ReplayPosition lastReplayPosition;
 
-        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier)
+        private PostFlush(boolean flushSecondaryIndexes, OpOrder.Barrier writeBarrier, ReplayPosition
lastReplayPosition)
         {
             this.writeBarrier = writeBarrier;
             this.flushSecondaryIndexes = flushSecondaryIndexes;
+            this.lastReplayPosition = lastReplayPosition;
         }
 
         public void run()
@@ -995,19 +997,36 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             memtables = new ArrayList<>();
 
             // submit flushes for the memtable for any indexed sub-cfses, and our own
-            final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
+            AtomicReference<ReplayPosition> lastReplayPositionHolder = new AtomicReference<>();
             for (ColumnFamilyStore cfs : concatWithIndexes())
             {
                 // switch all memtables, regardless of their dirty status, setting the barrier
                 // so that we can reach a coordinated decision about cleanliness once they
                 // are no longer possible to be modified
                 Memtable mt = cfs.data.switchMemtable(truncate);
-                mt.setDiscarding(writeBarrier, minReplayPosition);
+                mt.setDiscarding(writeBarrier, lastReplayPositionHolder);
                 memtables.add(mt);
             }
 
+            // we now attempt to define the lastReplayPosition; we do this by grabbing the
current limit from the CL
+            // and attempting to set the holder to this value. at the same time all writes
to the memtables are
+            // also maintaining this value, so if somebody sneaks ahead of us somehow (should
be rare) we simply retry,
+            // so that we know all operations prior to the position have not reached it yet
+            ReplayPosition lastReplayPosition;
+            while (true)
+            {
+                lastReplayPosition = new Memtable.LastReplayPosition(CommitLog.instance.getContext());
+                ReplayPosition currentLast = lastReplayPositionHolder.get();
+                if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <=
0)
+                    && lastReplayPositionHolder.compareAndSet(currentLast, lastReplayPosition))
+                    break;
+            }
+
+            // we then issue the barrier; this lets us wait for all operations started prior
to the barrier to complete;
+            // since this happens after wiring up the lastReplayPosition, we also know all
operations with earlier
+            // replay positions have also completed, i.e. the memtables are done and ready
to flush
             writeBarrier.issue();
-            postFlush = new PostFlush(!truncate, writeBarrier);
+            postFlush = new PostFlush(!truncate, writeBarrier, lastReplayPosition);
         }
 
         public void run()
@@ -1059,7 +1078,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             // signal the post-flush we've done our work
-            postFlush.lastReplayPosition = memtables.get(0).getLastReplayPosition();
             postFlush.latch.countDown();
         }
     }
@@ -1131,8 +1149,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         long start = System.nanoTime();
 
-        Memtable mt = data.getMemtableFor(opGroup);
-        final long timeDelta = mt.put(key, columnFamily, indexer, opGroup, replayPosition);
+        Memtable mt = data.getMemtableFor(opGroup, replayPosition);
+        final long timeDelta = mt.put(key, columnFamily, indexer, opGroup);
         maybeUpdateRowCache(key);
         metric.writeLatency.addNano(System.nanoTime() - start);
         if(timeDelta < Long.MAX_VALUE)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index 7df2b75..d086b47 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.*;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public class DataTracker
     }
 
     // get the Memtable that the ordered writeOp should be directed to
-    public Memtable getMemtableFor(OpOrder.Group opGroup)
+    public Memtable getMemtableFor(OpOrder.Group opGroup, ReplayPosition replayPosition)
     {
         // since any new memtables appended to the list after we fetch it will be for operations
started
         // after us, we can safely assume that we will always find the memtable that 'accepts'
us;
@@ -65,7 +66,7 @@ public class DataTracker
         // assign operations to a memtable that was retired/queued before we started)
         for (Memtable memtable : view.get().liveMemtables)
         {
-            if (memtable.accepts(opGroup))
+            if (memtable.accepts(opGroup, replayPosition))
                 return memtable;
         }
         throw new AssertionError(view.get().liveMemtables.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 3ae5da4..eb04bea 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -61,10 +61,17 @@ public class Memtable
     // the write barrier for directing writes to this memtable during a switch
     private volatile OpOrder.Barrier writeBarrier;
     // the last ReplayPosition owned by this Memtable; all ReplayPositions lower are owned
by this or an earlier Memtable
-    private final AtomicReference<ReplayPosition> lastReplayPosition = new AtomicReference<>();
+    private volatile AtomicReference<ReplayPosition> lastReplayPosition;
     // the "first" ReplayPosition owned by this Memtable; this is inaccurate, and only used
as a convenience to prevent CLSM flushing wantonly
     private final ReplayPosition minReplayPosition = CommitLog.instance.getContext();
 
+    public static final class LastReplayPosition extends ReplayPosition
+    {
+        public LastReplayPosition(ReplayPosition copy) {
+            super(copy.segment, copy.position);
+        }
+    }
+
     // We index the memtable by RowPosition only for the purpose of being able
     // to select key range using Token.KeyBound. However put() ensures that we
     // actually only store DecoratedKey.
@@ -101,10 +108,10 @@ public class Memtable
         return currentOperations.get();
     }
 
-    void setDiscarding(OpOrder.Barrier writeBarrier, ReplayPosition minLastReplayPosition)
+    void setDiscarding(OpOrder.Barrier writeBarrier, AtomicReference<ReplayPosition>
lastReplayPosition)
     {
         assert this.writeBarrier == null;
-        this.lastReplayPosition.set(minLastReplayPosition);
+        this.lastReplayPosition = lastReplayPosition;
         this.writeBarrier = writeBarrier;
         allocator.setDiscarding();
     }
@@ -114,10 +121,34 @@ public class Memtable
         allocator.setDiscarded();
     }
 
-    public boolean accepts(OpOrder.Group opGroup)
+    // decide if this memtable should take the write, or if it should go to the next memtable
+    public boolean accepts(OpOrder.Group opGroup, ReplayPosition replayPosition)
     {
+        // if the barrier hasn't been set yet, then this memtable is still taking ALL writes
         OpOrder.Barrier barrier = this.writeBarrier;
-        return barrier == null || barrier.isAfter(opGroup);
+        if (barrier == null)
+            return true;
+        // if the barrier has been set, but is in the past, we are definitely destined for
a future memtable
+        if (!barrier.isAfter(opGroup))
+            return false;
+        // if we aren't durable we are directed only by the barrier
+        if (replayPosition == null)
+            return true;
+        while (true)
+        {
+            // otherwise we check if we are in the past/future wrt the CL boundary;
+            // if the boundary hasn't been finalised yet, we simply update it to the max
of
+            // its current value and ours; if it HAS been finalised, we simply accept its
judgement
+            // this permits us to coordinate a safe boundary, as the boundary choice is made
+            // atomically wrt our max() maintenance, so an operation cannot sneak into the
past
+            ReplayPosition currentLast = lastReplayPosition.get();
+            if (currentLast instanceof LastReplayPosition)
+                return currentLast.compareTo(replayPosition) >= 0;
+            if (currentLast != null && currentLast.compareTo(replayPosition) >=
0)
+                return true;
+            if (lastReplayPosition.compareAndSet(currentLast, replayPosition))
+                return true;
+        }
     }
 
     public boolean isLive()
@@ -150,22 +181,8 @@ public class Memtable
      *
      * replayPosition should only be null if this is a secondary index, in which case it
is *expected* to be null
      */
-    long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group
opGroup, ReplayPosition replayPosition)
+    long put(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer, OpOrder.Group
opGroup)
     {
-        if (replayPosition != null && writeBarrier != null)
-        {
-            // if the writeBarrier is set, we want to maintain lastReplayPosition; this is
an optimisation to avoid
-            // casing it for every write, but still ensure it is correct when writeBarrier.await()
completes.
-            while (true)
-            {
-                ReplayPosition last = lastReplayPosition.get();
-                if (last.compareTo(replayPosition) >= 0)
-                    break;
-                if (lastReplayPosition.compareAndSet(last, replayPosition))
-                    break;
-            }
-        }
-
         AtomicBTreeColumns previous = rows.get(key);
 
         if (previous == null)
@@ -274,11 +291,6 @@ public class Memtable
         return creationTime;
     }
 
-    public ReplayPosition getLastReplayPosition()
-    {
-        return lastReplayPosition.get();
-    }
-
     class FlushRunnable extends DiskAwareRunnable
     {
         private final ReplayPosition context;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e3f6151/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index ee9ca14..9b51a33 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -158,8 +158,8 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * @return a Future representing a ReplayPosition such that when it is ready,
-     * all Allocations created prior to the getContext call will be written to the log
+     * @return a ReplayPosition which, if >= one returned from add(), implies add() was
started
+     * (but not necessarily finished) prior to this call
      */
     public ReplayPosition getContext()
     {


Mime
View raw message