cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [3/6] git commit: Tuning knobs for dealing with large blobs and many CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan
Date Tue, 17 Sep 2013 17:14:38 GMT
Tuning knobs for dealing with large blobs and many CFs
patch by jbellis; reviewed by yukim and Jeremiah Jordan


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0

Branch: refs/heads/trunk
Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd
Parents: 0804b76
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Tue Sep 17 12:06:18 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Tue Sep 17 12:07:54 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 conf/cassandra.yaml                             |   6 +-
 .../org/apache/cassandra/config/Config.java     |   1 +
 .../cassandra/config/DatabaseDescriptor.java    |   8 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   4 +-
 src/java/org/apache/cassandra/db/Memtable.java  | 133 ++++++++++---------
 .../org/apache/cassandra/db/MeteredFlusher.java |  22 ++-
 .../PeriodicCommitLogExecutorService.java       |   2 +-
 .../apache/cassandra/utils/StatusLogger.java    |   9 +-
 9 files changed, 106 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ad32460..fb4f3f4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.2.10
+ * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982)
  * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002)
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
  * Allow local batchlog writes for CL.ANY (CASSANDRA-5967)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index d52f2e8..27ac09b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
-# milliseconds.
+# milliseconds.  By default this allows 1024*(CPU cores) pending
+# entries on the commitlog queue.  If you are writing very large blobs,
+# you should reduce that; 16*cores works reasonably well for 1MB blobs.
+# It should be at least as large as the concurrent_writes setting.
 commitlog_sync: periodic
 commitlog_sync_period_in_ms: 10000
+# commitlog_periodic_queue_size:
 
 # The size of the individual commitlog file segments.  A commitlog
 # segment may be archived, deleted, or recycled once all the data

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 74b941d..a924a4c 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -126,6 +126,7 @@ public class Config
     public Double commitlog_sync_batch_window_in_ms;
     public Integer commitlog_sync_period_in_ms;
     public int commitlog_segment_size_in_mb = 32;
+    public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors();
 
     public String endpoint_snitch;
     public Boolean dynamic_snitch = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 1412888..8e3cbe2 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1047,10 +1047,16 @@ public class DatabaseDescriptor
         return conf.commitlog_sync_batch_window_in_ms;
     }
 
-    public static int getCommitLogSyncPeriod() {
+    public static int getCommitLogSyncPeriod()
+    {
         return conf.commitlog_sync_period_in_ms;
     }
 
+    public static int getCommitLogPeriodicQueueSize()
+    {
+        return conf.commitlog_periodic_queue_size;
+    }
+
     public static Config.CommitLogSync getCommitLogSync()
     {
         return conf.commitlog_sync;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 745b5ba..c646461 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final Directories directories;
 
     /** ratio of in-memory memtable size, to serialized size */
-    volatile double liveRatio = 1.0;
+    volatile double liveRatio = 10.0; // reasonable default until we compute what it is based
on actual data
     /** ops count last time we computed liveRatio */
     private final AtomicLong liveRatioComputedAt = new AtomicLong(32);
 
@@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return (int) metric.memtableSwitchCount.count();
     }
 
-    private Memtable getMemtableThreadSafe()
+    Memtable getMemtableThreadSafe()
     {
         return data.getMemtable();
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index df06cfb..cbe20fe 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.StageManager;
@@ -84,24 +83,15 @@ public class Memtable
     // outstanding/running meterings to a maximum of one per CFS using this set; the executor's
queue is unbounded but
     // will implicitly be bounded by the number of CFS:s.
     private static final Set<ColumnFamilyStore> meteringInProgress = new NonBlockingHashSet<ColumnFamilyStore>();
-    private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1,
-                                                                                        
 1,
+    private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                         
 Integer.MAX_VALUE,
                                                                                         
 TimeUnit.MILLISECONDS,
                                                                                         
 new LinkedBlockingQueue<Runnable>(),
-                                                                                        
 new NamedThreadFactory("MemoryMeter"))
-    {
-        @Override
-        protected void afterExecute(Runnable r, Throwable t)
-        {
-            super.afterExecute(r, t);
-            DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
-        }
-    };
-
+                                                                                        
 new NamedThreadFactory("MemoryMeter"),
+                                                                                        
 "internal");
     private final MemoryMeter meter;
 
-    volatile static Memtable activelyMeasuring;
+    volatile static ColumnFamilyStore activelyMeasuring;
 
     private final AtomicLong currentSize = new AtomicLong(0);
     private final AtomicLong currentOperations = new AtomicLong(0);
@@ -185,8 +175,9 @@ public class Memtable
         if (!MemoryMeter.isInitialized())
         {
             // hack for openjdk.  we log a warning about this in the startup script too.
-            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming
liveRatio of 10.0.  Usually this means cassandra-env.sh disabled jamm because you are using
a buggy JRE; upgrade to the Sun JRE instead");
-            cfs.liveRatio = 10.0;
+            logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming
liveRatio of {}.  "
+                        + " Usually this means cassandra-env.sh disabled jamm because you
are using a buggy JRE; "
+                        + " upgrade to the Sun JRE instead", cfs.liveRatio);
             return;
         }
 
@@ -196,56 +187,7 @@ public class Memtable
             return;
         }
 
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                try
-                {
-                    activelyMeasuring = Memtable.this;
-
-                    long start = System.currentTimeMillis();
-                    // ConcurrentSkipListMap has cycles, so measureDeep will have to track
a reference to EACH object it visits.
-                    // So to reduce the memory overhead of doing a measurement, we break
it up to row-at-a-time.
-                    long deepSize = meter.measure(columnFamilies);
-                    int objects = 0;
-                    for (Map.Entry<RowPosition, ColumnFamily> entry : columnFamilies.entrySet())
-                    {
-                        deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue());
-                        objects += entry.getValue().getColumnCount();
-                    }
-                    double newRatio = (double) deepSize / currentSize.get();
-
-                    if (newRatio < MIN_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to minimum of {} instead of {}",
MIN_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MIN_SANE_LIVE_RATIO;
-                    }
-                    if (newRatio > MAX_SANE_LIVE_RATIO)
-                    {
-                        logger.warn("setting live ratio to maximum of {} instead of {}",
MAX_SANE_LIVE_RATIO, newRatio);
-                        newRatio = MAX_SANE_LIVE_RATIO;
-                    }
-
-                    // we want to be very conservative about our estimate, since the penalty
for guessing low is OOM
-                    // death.  thus, higher estimates are believed immediately; lower ones
are averaged w/ the old
-                    if (newRatio > cfs.liveRatio)
-                        cfs.liveRatio = newRatio;
-                    else
-                        cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
-
-                    logger.info("{} liveRatio is {} (just-counted was {}).  calculation took
{}ms for {} columns",
-                                cfs, cfs.liveRatio, newRatio, System.currentTimeMillis()
- start, objects);
-                    activelyMeasuring = null;
-                }
-                finally
-                {
-                    meteringInProgress.remove(cfs);
-                }
-            }
-        };
-
-        meterExecutor.submit(runnable);
+        meterExecutor.submit(new MeteringRunnable(cfs));
     }
 
     private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater
indexer)
@@ -520,4 +462,63 @@ public class Memtable
                                      sstableMetadataCollector);
         }
     }
+
+    private static class MeteringRunnable implements Runnable
+    {
+        // we might need to wait in the meter queue for a while.  measure whichever memtable
is active at that point,
+        // rather than keeping the original memtable referenced (and thus un-freeable) until
this runs.
+        private final ColumnFamilyStore cfs;
+
+        public MeteringRunnable(ColumnFamilyStore cfs)
+        {
+            this.cfs = cfs;
+        }
+
+        public void run()
+        {
+            try
+            {
+                activelyMeasuring = cfs;
+                Memtable memtable = cfs.getMemtableThreadSafe();
+
+                long start = System.currentTimeMillis();
+                // ConcurrentSkipListMap has cycles, so measureDeep will have to track a
reference to EACH object it visits.
+                // So to reduce the memory overhead of doing a measurement, we break it up
to row-at-a-time.
+                long deepSize = memtable.meter.measure(memtable.columnFamilies);
+                int objects = 0;
+                for (Map.Entry<RowPosition, ColumnFamily> entry : memtable.columnFamilies.entrySet())
+                {
+                    deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue());
+                    objects += entry.getValue().getColumnCount();
+                }
+                double newRatio = (double) deepSize / memtable.currentSize.get();
+
+                if (newRatio < MIN_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO,
newRatio);
+                    newRatio = MIN_SANE_LIVE_RATIO;
+                }
+                if (newRatio > MAX_SANE_LIVE_RATIO)
+                {
+                    logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO,
newRatio);
+                    newRatio = MAX_SANE_LIVE_RATIO;
+                }
+
+                // we want to be very conservative about our estimate, since the penalty
for guessing low is OOM
+                // death.  thus, higher estimates are believed immediately; lower ones are
averaged w/ the old
+                if (newRatio > cfs.liveRatio)
+                    cfs.liveRatio = newRatio;
+                else
+                    cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0;
+
+                logger.info("{} liveRatio is {} (just-counted was {}).  calculation took
{}ms for {} columns",
+                            cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start,
objects);
+            }
+            finally
+            {
+                activelyMeasuring = null;
+                meteringInProgress.remove(cfs);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 408727c..f16b8a0 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable
 
     public void run()
     {
+        long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() *
1048576L;
+
         // first, find how much memory non-active memtables are using
-        Memtable activelyMeasuring = Memtable.activelyMeasuring;
-        long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize();
+        long flushingBytes = Memtable.activelyMeasuring == null
+                           ? 0
+                           : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize();
         flushingBytes += countFlushingBytes();
+        if (flushingBytes > 0)
+            logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
 
         // next, flush CFs using more than 1 / (maximum number of memtables it could have
in the pipeline)
         // of the total size allotted.  Then, flush other CFs in order of size if necessary.
         long liveBytes = 0;
         try
         {
+            long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
             {
                 long size = cfs.getTotalMemtableLiveSize();
@@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable
                                                             + DatabaseDescriptor.getFlushWriters()
                                                             + DatabaseDescriptor.getFlushQueueSize())
                                                   / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
-                if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L
- flushingBytes) / maxInFlight)
+                if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused
/ maxInFlight)
                 {
                     logger.info("flushing high-traffic column family {} (estimated {} bytes)",
cfs, size);
                     cfs.forceFlush();
@@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable
                 }
             }
 
-            if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB()
* 1048576L)
+            if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
                 return;
 
-            logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes);
+            logger.info("estimated {} live and {} flushing bytes used by all memtables",
liveBytes, flushingBytes);
 
             // sort memtables by size
             List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
@@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable
             // flush largest first until we get below our threshold.
             // although it looks like liveBytes + flushingBytes will stay a constant, it
will not if flushes finish
             // while we loop, which is especially likely to happen if the flush queue fills
up (so further forceFlush calls block)
-            while (true)
+            while (!sorted.isEmpty())
             {
                 flushingBytes = countFlushingBytes();
-                if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB()
* 1048576L || sorted.isEmpty())
+                if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
                     break;
 
                 ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
                 long size = cfs.getTotalMemtableLiveSize();
+                if (size == 0)
+                    break;
                 logger.info("flushing {} to free up {} bytes", cfs, size);
                 liveBytes -= size;
                 cfs.forceFlush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
index 94f593e..48dbfac 100644
--- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java
@@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService
 
     public PeriodicCommitLogExecutorService(final CommitLog commitLog)
     {
-        queue = new LinkedBlockingQueue<Runnable>(1024 * FBUtilities.getAvailableProcessors());
+        queue = new LinkedBlockingQueue<Runnable>(DatabaseDescriptor.getCommitLogPeriodicQueueSize());
         Runnable runnable = new WrappedRunnable()
         {
             public void runMayThrow() throws Exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index cf5c5fa..15c4811 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
@@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.RowIndexEntry;
+import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.CacheService;
@@ -74,9 +78,10 @@ public class StatusLogger
                                       threadPoolProxy.getTotalBlockedTasks()));
         }
         // one offs
-        CompactionManager cm = CompactionManager.instance;
         logger.info(String.format("%-25s%10s%10s",
-                                  "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks()));
+                                  "CompactionManager", CompactionManager.instance.getActiveCompactions(),
CompactionManager.instance.getPendingTasks()));
+        logger.info(String.format("%-25s%10s%10s",
+                                  "Commitlog", "n/a", CommitLog.instance.getPendingTasks()));
         int pendingCommands = 0;
         for (int n : MessagingService.instance().getCommandPendingTasks().values())
         {


Mime
View raw message