cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/6] git commit: avoid blocking additional writes during flush patch by jbellis; reviewed by slebresnse and tested by brandonwilliams for CASSANDRA-1991
Date Wed, 09 May 2012 19:52:32 GMT
Updated Branches:
  refs/heads/cassandra-1.1 861f1f3a9 -> 08848e795
  refs/heads/trunk ca104bac3 -> 4d7e70356


avoid blocking additional writes during flush
patch by jbellis; reviewed by slebresnse and tested by brandonwilliams for CASSANDRA-1991


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/08848e79
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/08848e79
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/08848e79

Branch: refs/heads/cassandra-1.1
Commit: 08848e7956f5fd08525a08498205637b2652f2a7
Parents: 67ed39f
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Wed May 9 14:51:24 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Wed May 9 14:52:18 2012 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   12 +++---
 src/java/org/apache/cassandra/db/Memtable.java     |   12 +++---
 .../apache/cassandra/db/commitlog/CommitLog.java   |   19 +++--------
 .../db/compaction/LeveledCompactionStrategy.java   |   25 +++++++++++++--
 .../cassandra/db/compaction/LeveledManifest.java   |    9 +++++
 .../org/apache/cassandra/db/CommitLogTest.java     |    6 ++--
 7 files changed, 53 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f17ffd1..9246433 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 1.1.1-dev
+ * avoid blocking additional writes during flush when the commitlog
+   gets behind temporarily (CASSANDRA-1991)
  * enable caching on index CFs based on data CF cache setting (CASSANDRA-4197)
  * warn on invalid replication strategy creation options (CASSANDRA-4046)
  * remove [Freeable]Memory finalizers (CASSANDRA-4222)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 659be73..9dcf1ef 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -31,6 +31,7 @@ import java.util.regex.Pattern;
 import javax.management.*;
 
 import com.google.common.collect.*;
+import com.google.common.util.concurrent.Futures;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -609,8 +610,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
 
             assert getMemtableThreadSafe() == oldMemtable;
-            final ReplayPosition ctx = writeCommitLog ? CommitLog.instance.getContext() :
ReplayPosition.NONE;
-            logger.debug("flush position is {}", ctx);
+            final Future<ReplayPosition> ctx = writeCommitLog ? CommitLog.instance.getContext()
: Futures.immediateFuture(ReplayPosition.NONE);
 
             // submit the memtable for any indexed sub-cfses, and our own.
             final List<ColumnFamilyStore> icc = new ArrayList<ColumnFamilyStore>();
@@ -642,7 +642,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             // while keeping the wait-for-flush (future.get) out of anything latency-sensitive.
             return postFlushExecutor.submit(new WrappedRunnable()
             {
-                public void runMayThrow() throws InterruptedException, IOException
+                public void runMayThrow() throws InterruptedException, IOException, ExecutionException
                 {
                     latch.await();
 
@@ -662,7 +662,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     {
                         // if we're not writing to the commit log, we are replaying the log,
so marking
                         // the log header with "you can discard anything written before the
context" is not valid
-                        CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx);
+                        CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx.get());
                     }
                 }
             });
@@ -1710,13 +1710,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         if (ksm.durableWrites)
         {
             CommitLog.instance.forceNewSegment();
-            ReplayPosition position = CommitLog.instance.getContext();
+            Future<ReplayPosition> position = CommitLog.instance.getContext();
             // now flush everyone else.  re-flushing ourselves is not necessary, but harmless
             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
                 cfs.forceFlush();
             waitForActiveFlushes();
             // if everything was clean, flush won't have called discard
-            CommitLog.instance.discardCompletedSegments(metadata.cfId, position);
+            CommitLog.instance.discardCompletedSegments(metadata.cfId, position.get());
         }
 
         // sleep a little to make sure that our truncatedAt comes after any sstable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 1eddf15..f1dcc56 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -261,7 +261,7 @@ public class Memtable
     }
 
 
-    private SSTableReader writeSortedContents(ReplayPosition context) throws IOException
+    private SSTableReader writeSortedContents(Future<ReplayPosition> context) throws
IOException, ExecutionException, InterruptedException
     {
         logger.info("Writing " + this);
 
@@ -278,7 +278,7 @@ public class Memtable
                                      * 1.2); // bloom filter and row index overhead
         SSTableReader ssTable;
         // errors when creating the writer that may leave empty temp files.
-        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), estimatedSize,
context);
+        SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), estimatedSize,
context.get());
         try
         {
             // (we can't clear out the map as-we-go to free up memory,
@@ -304,16 +304,16 @@ public class Memtable
             writer.abort();
             throw FBUtilities.unchecked(e);
         }
-        logger.info(String.format("Completed flushing %s (%d bytes)",
-                                  ssTable.getFilename(), new File(ssTable.getFilename()).length()));
+        logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position
%s",
+                                  ssTable.getFilename(), new File(ssTable.getFilename()).length(),
context.get()));
         return ssTable;
     }
 
-    public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final
ReplayPosition context)
+    public void flushAndSignal(final CountDownLatch latch, ExecutorService writer, final
Future<ReplayPosition> context)
     {
         writer.execute(new WrappedRunnable()
         {
-            public void runMayThrow() throws IOException
+            public void runMayThrow() throws Exception
             {
                 SSTableReader sstable = writeSortedContents(context);
                 cfs.replaceFlushed(Memtable.this, sstable);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index 3c34772..a490569 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -23,6 +23,7 @@ import java.lang.management.ManagementFactory;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.*;
@@ -159,9 +160,10 @@ public class CommitLog implements CommitLogMBean
     }
 
     /**
-     * @return the current ReplayPosition of the current segment file
+     * @return a Future representing a ReplayPosition such that when it is ready,
+     * all commitlog tasks enqueued prior to the getContext call will be complete (i.e.,
appended to the log)
      */
-    public ReplayPosition getContext()
+    public Future<ReplayPosition> getContext()
     {
         Callable<ReplayPosition> task = new Callable<ReplayPosition>()
         {
@@ -170,18 +172,7 @@ public class CommitLog implements CommitLogMBean
                 return activeSegment.getContext();
             }
         };
-        try
-        {
-            return executor.submit(task).get();
-        }
-        catch (InterruptedException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (ExecutionException e)
-        {
-            throw new RuntimeException(e);
-        }
+        return executor.submit(task);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 858a2bc..5403aa2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -172,21 +172,40 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
     {
         Multimap<Integer, SSTableReader> byLevel = ArrayListMultimap.create();
         for (SSTableReader sstable : sstables)
-            byLevel.get(manifest.levelOf(sstable)).add(sstable);
+        {
+            int level = manifest.levelOf(sstable);
+            assert level >= 0;
+            byLevel.get(level).add(sstable);
+        }
 
         List<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>(sstables.size());
         for (Integer level : byLevel.keySet())
         {
             if (level == 0)
             {
-                // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner
for each
+                // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner
for each.
                 for (SSTableReader sstable : byLevel.get(level))
                     scanners.add(sstable.getDirectScanner(range));
             }
             else
             {
                 // Create a LeveledScanner that only opens one sstable at a time, in sorted
order
-                scanners.add(new LeveledScanner(byLevel.get(level), range));
+                ArrayList<SSTableReader> sstables1 = new ArrayList<SSTableReader>(byLevel.get(level));
+                scanners.add(new LeveledScanner(sstables1, range));
+
+                Collections.sort(sstables1, SSTable.sstableComparator);
+                SSTableReader previous = null;
+                for (SSTableReader sstable : sstables1)
+                {
+                    assert previous == null || sstable.first.compareTo(previous.last) >
0 : String.format("%s >= %s in %s and %s for %s in %s",
+                                                                                        
                 previous.last,
+                                                                                        
                 sstable.first,
+                                                                                        
                 previous,
+                                                                                        
                 sstable,
+                                                                                        
                 sstable.getColumnFamilyName(),
+                                                                                        
                 manifest.getLevel(level));
+                    previous = sstable;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 69ab492..c3517e1 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -188,6 +189,14 @@ public class LeveledManifest
         for (SSTableReader ssTableReader : added)
             add(ssTableReader, newLevel);
 
+        DecoratedKey last = null;
+        Collections.sort(generations[newLevel], SSTable.sstableComparator);
+        for (SSTableReader sstable : generations[newLevel])
+        {
+            assert last == null || sstable.first.compareTo(last) > 0;
+            last = sstable.last;
+        }
+
         serialize();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/08848e79/test/unit/org/apache/cassandra/db/CommitLogTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java
index e4b04a4..4e48d73 100644
--- a/test/unit/org/apache/cassandra/db/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/CommitLogTest.java
@@ -112,7 +112,7 @@ public class CommitLogTest extends SchemaLoader
         assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " +
CommitLog.instance.activeSegments();
 
         int cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext().get());
 
         // Assert we still have both our segment
         assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " +
CommitLog.instance.activeSegments();
@@ -134,7 +134,7 @@ public class CommitLogTest extends SchemaLoader
 
         // "Flush": this won't delete anything
         int cfid1 = rm.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext().get());
 
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();
 
@@ -152,7 +152,7 @@ public class CommitLogTest extends SchemaLoader
         // didn't write anything on cf1 since last flush (and we flush cf2)
 
         int cfid2 = rm2.getColumnFamilyIds().iterator().next();
-        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext());
+        CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext().get());
 
         // Assert we still have both our segment
         assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments();


Mime
View raw message