cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [17/21] git commit: make memory metering use an unbounded queue to avoid blocking the write path patch by pschuller and jbellis; reviewed by slebresne for CASSANDRA-4032
Date Wed, 11 Apr 2012 18:26:43 GMT
make memory metering use an unbounded queue to avoid blocking the write path
patch by pschuller and jbellis; reviewed by slebresne for CASSANDRA-4032


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fd3bfac6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fd3bfac6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fd3bfac6

Branch: refs/heads/cassandra-1.1.0
Commit: fd3bfac6cbc487e36ac1c39740c5897e350d0d16
Parents: d49113f
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Tue Apr 10 10:59:35 2012 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Wed Apr 11 13:24:09 2012 -0500

----------------------------------------------------------------------
 src/java/org/apache/cassandra/db/Memtable.java |   86 ++++++++++--------
 1 files changed, 48 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd3bfac6/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 81dac7c..d9e9570 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.SlabAllocator;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.github.jamm.MemoryMeter;
 
 public class Memtable
@@ -53,14 +54,17 @@ public class Memtable
     // max liveratio seen w/ 1-byte columns on a 64-bit jvm was 19. If it gets higher than
64 something is probably broken.
     private static final double MAX_SANE_LIVE_RATIO = 64.0;
 
-    // we're careful to only allow one count to run at a time because counting is slow
-    // (can be minutes, for a large memtable and a busy server), so we could keep memtables
-    // alive after they're flushed and would otherwise be GC'd.
+    // we want to limit the amount of concurrently running and/or queued meterings, because
counting is slow (can be
+    // minutes, for a large memtable and a busy server). so we could keep memtables
+    // alive after they're flushed and would otherwise be GC'd. the approach we take is to
bound the number of
+    // outstanding/running meterings to a maximum of one per CFS using this set; the executor's
queue is unbounded but
+    // will implicitly be bounded by the number of CFS:s.
+    private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
     private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
                                                                                         
 1,
                                                                                         
 Integer.MAX_VALUE,
                                                                                         
 TimeUnit.MILLISECONDS,
-                                                                                        
 new SynchronousQueue<Runnable>(),
+                                                                                        
 new LinkedBlockingQueue<Runnable>(),
                                                                                         
 new NamedThreadFactory("MemoryMeter"))
     {
         @Override
@@ -152,7 +156,7 @@ public class Memtable
         resolve(key, columnFamily);
     }
 
-    public void updateLiveRatio()
+    public void updateLiveRatio() throws RuntimeException
     {
         if (!MemoryMeter.isInitialized())
         {
@@ -162,50 +166,56 @@ public class Memtable
             return;
         }
 
+        if (!meteringInProgress.add(cfs))
+        {
+            logger.debug("Metering already pending or active for {}; skipping liveRatio update",
cfs);
+            return;
+        }
+
         Runnable runnable = new Runnable()
         {
             public void run()
             {
-                activelyMeasuring = Memtable.this;
-
-                long start = System.currentTimeMillis();
-                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a
reference to EACH object it visits.
-                // So to reduce the memory overhead of doing a measurement, we break it up
to row-at-a-time.
-                long deepSize = meter.measure(columnFamilies);
-                int objects = 0;
-                for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
-                {
-                    deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
-                    objects += entry.getValue().getColumnCount();
-                }
-                double newRatio = (double) deepSize / currentThroughput.get();
-
-                if (newRatio < MIN_SANE_LIVE_RATIO)
+                try
                 {
-                    logger.warn("setting live ratio to minimum of 1.0 instead of {}", newRatio);
-                    newRatio = MIN_SANE_LIVE_RATIO;
+                    activelyMeasuring = Memtable.this;
+
+                    long start = System.currentTimeMillis();
+                    // ConcurrentSkipListMap has cycles, so measureDeep will have to track
a reference to EACH object it visits.
+                    // So to reduce the memory overhead of doing a measurement, we break
it up to row-at-a-time.
+                    long deepSize = meter.measure(columnFamilies);
+                    int objects = 0;
+                    for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
+                    {
+                        deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
+                        objects += entry.getValue().getColumnCount();
+                    }
+                    double newRatio = (double) deepSize / currentThroughput.get();
+
+                    if (newRatio < MIN_SANE_LIVE_RATIO)
+                    {
+                        logger.warn("setting live ratio to minimum of 1.0 instead of {}",
newRatio);
+                        newRatio = MIN_SANE_LIVE_RATIO;
+                    }
+                    if (newRatio > MAX_SANE_LIVE_RATIO)
+                    {
+                        logger.warn("setting live ratio to maximum of 64 instead of {}",
newRatio);
+                        newRatio = MAX_SANE_LIVE_RATIO;
+                    }
+                    cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
+
+                    logger.info("{} liveRatio is {} (just-counted was {}).  calculation took
{}ms for {} columns",
+                                new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis()
- start, objects });
+                    activelyMeasuring = null;
                 }
-                if (newRatio > MAX_SANE_LIVE_RATIO)
+                finally
                 {
-                    logger.warn("setting live ratio to maximum of 64 instead of {}", newRatio);
-                    newRatio = MAX_SANE_LIVE_RATIO;
+                    meteringInProgress.remove(cfs);
                 }
-                cfs.liveRatio = Math.max(cfs.liveRatio, newRatio);
-
-                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took
{}ms for {} columns",
-                            new Object[]{ cfs, cfs.liveRatio, newRatio, System.currentTimeMillis()
- start, objects });
-                activelyMeasuring = null;
             }
         };
 
-        try
-        {
-            meterExecutor.submit(runnable);
-        }
-        catch (RejectedExecutionException e)
-        {
-            logger.debug("Meter thread is busy; skipping liveRatio update for {}", cfs);
-        }
+        meterExecutor.submit(runnable);
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf)


Mime
View raw message