Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0169310998 for ; Tue, 17 Sep 2013 17:15:29 +0000 (UTC) Received: (qmail 74600 invoked by uid 500); 17 Sep 2013 17:14:56 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 74503 invoked by uid 500); 17 Sep 2013 17:14:48 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 74347 invoked by uid 99); 17 Sep 2013 17:14:37 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Sep 2013 17:14:37 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C5653A143; Tue, 17 Sep 2013 17:14:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Tue, 17 Sep 2013 17:14:38 -0000 Message-Id: In-Reply-To: <0dbd59c04c734da19ca3862d5e168bee@git.apache.org> References: <0dbd59c04c734da19ca3862d5e168bee@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/6] git commit: Tuning knobs for dealing with large blobs and many CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan Tuning knobs for dealing with large blobs and many CFs patch by jbellis; reviewed by yukim and Jeremiah Jordan Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dc95c8c0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dc95c8c0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dc95c8c0 Branch: refs/heads/trunk Commit: dc95c8c0beb9309cfdfbabbd32ea1b74084daafd Parents: 0804b76 Author: Jonathan Ellis Authored: Tue Sep 17 12:06:18 2013 -0500 Committer: Jonathan Ellis Committed: Tue Sep 17 12:07:54 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + conf/cassandra.yaml | 6 +- .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 8 +- .../apache/cassandra/db/ColumnFamilyStore.java | 4 +- src/java/org/apache/cassandra/db/Memtable.java | 133 ++++++++++--------- .../org/apache/cassandra/db/MeteredFlusher.java | 22 ++- .../PeriodicCommitLogExecutorService.java | 2 +- .../apache/cassandra/utils/StatusLogger.java | 9 +- 9 files changed, 106 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ad32460..fb4f3f4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.10 + * Tuning knobs for dealing with large blobs and many CFs (CASSANDRA-5982) * (Hadoop) Fix CQLRW for thrift tables (CASSANDRA-6002) * Fix possible divide-by-zero in HHOM (CASSANDRA-5990) * Allow local batchlog writes for CL.ANY (CASSANDRA-5967) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index d52f2e8..27ac09b 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -206,9 +206,13 @@ saved_caches_directory: /var/lib/cassandra/saved_caches # # the other option is "periodic" where writes may be acked immediately # and the CommitLog is simply synced every commitlog_sync_period_in_ms -# milliseconds. +# milliseconds. By default this allows 1024*(CPU cores) pending +# entries on the commitlog queue. If you are writing very large blobs, +# you should reduce that; 16*cores works reasonably well for 1MB blobs. +# It should be at least as large as the concurrent_writes setting. commitlog_sync: periodic commitlog_sync_period_in_ms: 10000 +# commitlog_periodic_queue_size: # The size of the individual commitlog file segments. A commitlog # segment may be archived, deleted, or recycled once all the data http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 74b941d..a924a4c 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -126,6 +126,7 @@ public class Config public Double commitlog_sync_batch_window_in_ms; public Integer commitlog_sync_period_in_ms; public int commitlog_segment_size_in_mb = 32; + public int commitlog_periodic_queue_size = 1024 * FBUtilities.getAvailableProcessors(); public String endpoint_snitch; public Boolean dynamic_snitch = true; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 1412888..8e3cbe2 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1047,10 +1047,16 @@ public class DatabaseDescriptor return conf.commitlog_sync_batch_window_in_ms; } - public static int getCommitLogSyncPeriod() { + public static int getCommitLogSyncPeriod() + { return conf.commitlog_sync_period_in_ms; } + public static int getCommitLogPeriodicQueueSize() + { + return conf.commitlog_periodic_queue_size; + } + public static Config.CommitLogSync getCommitLogSync() { return conf.commitlog_sync; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 745b5ba..c646461 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -104,7 +104,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public final Directories directories; /** ratio of in-memory memtable size, to serialized size */ - volatile double liveRatio = 1.0; + volatile double liveRatio = 10.0; // reasonable default until we compute what it is based on actual data /** ops count last time we computed liveRatio */ private final AtomicLong liveRatioComputedAt = new AtomicLong(32); @@ -1023,7 +1023,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return (int) metric.memtableSwitchCount.count(); } - private Memtable getMemtableThreadSafe() + Memtable getMemtableThreadSafe() { return data.getMemtable(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index df06cfb..cbe20fe 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -29,7 +29,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.concurrent.StageManager; @@ -84,24 +83,15 @@ public class Memtable // outstanding/running meterings to a maximum of one per CFS using this set; the executor's queue is unbounded but // will implicitly be bounded by the number of CFS:s. private static final Set meteringInProgress = new NonBlockingHashSet(); - private static final ExecutorService meterExecutor = new DebuggableThreadPoolExecutor(1, - 1, + private static final ExecutorService meterExecutor = new JMXEnabledThreadPoolExecutor(1, Integer.MAX_VALUE, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), - new NamedThreadFactory("MemoryMeter")) - { - @Override - protected void afterExecute(Runnable r, Throwable t) - { - super.afterExecute(r, t); - DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t); - } - }; - + new NamedThreadFactory("MemoryMeter"), + "internal"); private final MemoryMeter meter; - volatile static Memtable activelyMeasuring; + volatile static ColumnFamilyStore activelyMeasuring; private final AtomicLong currentSize = new AtomicLong(0); private final AtomicLong currentOperations = new AtomicLong(0); @@ -185,8 +175,9 @@ public class Memtable if (!MemoryMeter.isInitialized()) { // hack for openjdk. we log a warning about this in the startup script too. - logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of 10.0. Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; upgrade to the Sun JRE instead"); - cfs.liveRatio = 10.0; + logger.warn("MemoryMeter uninitialized (jamm not specified as java agent); assuming liveRatio of {}. " + + " Usually this means cassandra-env.sh disabled jamm because you are using a buggy JRE; " + + " upgrade to the Sun JRE instead", cfs.liveRatio); return; } @@ -196,56 +187,7 @@ public class Memtable return; } - Runnable runnable = new Runnable() - { - public void run() - { - try - { - activelyMeasuring = Memtable.this; - - long start = System.currentTimeMillis(); - // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits. - // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time. - long deepSize = meter.measure(columnFamilies); - int objects = 0; - for (Map.Entry entry : columnFamilies.entrySet()) - { - deepSize += meter.measureDeep(entry.getKey()) + meter.measureDeep(entry.getValue()); - objects += entry.getValue().getColumnCount(); - } - double newRatio = (double) deepSize / currentSize.get(); - - if (newRatio < MIN_SANE_LIVE_RATIO) - { - logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio); - newRatio = MIN_SANE_LIVE_RATIO; - } - if (newRatio > MAX_SANE_LIVE_RATIO) - { - logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio); - newRatio = MAX_SANE_LIVE_RATIO; - } - - // we want to be very conservative about our estimate, since the penalty for guessing low is OOM - // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old - if (newRatio > cfs.liveRatio) - cfs.liveRatio = newRatio; - else - cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0; - - logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns", - cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects); - activelyMeasuring = null; - } - finally - { - meteringInProgress.remove(cfs); - } - } - }; - - meterExecutor.submit(runnable); + meterExecutor.submit(new MeteringRunnable(cfs)); } private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer) @@ -520,4 +462,63 @@ public class Memtable sstableMetadataCollector); } } + + private static class MeteringRunnable implements Runnable + { + // we might need to wait in the meter queue for a while. measure whichever memtable is active at that point, + // rather than keeping the original memtable referenced (and thus un-freeable) until this runs. + private final ColumnFamilyStore cfs; + + public MeteringRunnable(ColumnFamilyStore cfs) + { + this.cfs = cfs; + } + + public void run() + { + try + { + activelyMeasuring = cfs; + Memtable memtable = cfs.getMemtableThreadSafe(); + + long start = System.currentTimeMillis(); + // ConcurrentSkipListMap has cycles, so measureDeep will have to track a reference to EACH object it visits. + // So to reduce the memory overhead of doing a measurement, we break it up to row-at-a-time. + long deepSize = memtable.meter.measure(memtable.columnFamilies); + int objects = 0; + for (Map.Entry entry : memtable.columnFamilies.entrySet()) + { + deepSize += memtable.meter.measureDeep(entry.getKey()) + memtable.meter.measureDeep(entry.getValue()); + objects += entry.getValue().getColumnCount(); + } + double newRatio = (double) deepSize / memtable.currentSize.get(); + + if (newRatio < MIN_SANE_LIVE_RATIO) + { + logger.warn("setting live ratio to minimum of {} instead of {}", MIN_SANE_LIVE_RATIO, newRatio); + newRatio = MIN_SANE_LIVE_RATIO; + } + if (newRatio > MAX_SANE_LIVE_RATIO) + { + logger.warn("setting live ratio to maximum of {} instead of {}", MAX_SANE_LIVE_RATIO, newRatio); + newRatio = MAX_SANE_LIVE_RATIO; + } + + // we want to be very conservative about our estimate, since the penalty for guessing low is OOM + // death. thus, higher estimates are believed immediately; lower ones are averaged w/ the old + if (newRatio > cfs.liveRatio) + cfs.liveRatio = newRatio; + else + cfs.liveRatio = (cfs.liveRatio + newRatio) / 2.0; + + logger.info("{} liveRatio is {} (just-counted was {}). calculation took {}ms for {} columns", + cfs, cfs.liveRatio, newRatio, System.currentTimeMillis() - start, objects); + } + finally + { + activelyMeasuring = null; + meteringInProgress.remove(cfs); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/MeteredFlusher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java index 408727c..f16b8a0 100644 --- a/src/java/org/apache/cassandra/db/MeteredFlusher.java +++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java @@ -35,16 +35,22 @@ public class MeteredFlusher implements Runnable public void run() { + long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L; + // first, find how much memory non-active memtables are using - Memtable activelyMeasuring = Memtable.activelyMeasuring; - long flushingBytes = activelyMeasuring == null ? 0 : activelyMeasuring.getLiveSize(); + long flushingBytes = Memtable.activelyMeasuring == null + ? 0 + : Memtable.activelyMeasuring.getMemtableThreadSafe().getLiveSize(); flushingBytes += countFlushingBytes(); + if (flushingBytes > 0) + logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed); // next, flush CFs using more than 1 / (maximum number of memtables it could have in the pipeline) // of the total size allotted. Then, flush other CFs in order of size if necessary. long liveBytes = 0; try { + long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes; for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { long size = cfs.getTotalMemtableLiveSize(); @@ -53,7 +59,7 @@ public class MeteredFlusher implements Runnable + DatabaseDescriptor.getFlushWriters() + DatabaseDescriptor.getFlushQueueSize()) / (1 + cfs.indexManager.getIndexesBackedByCfs().size())); - if (size > (DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L - flushingBytes) / maxInFlight) + if (totalMemtableBytesUnused > 0 && size > totalMemtableBytesUnused / maxInFlight) { logger.info("flushing high-traffic column family {} (estimated {} bytes)", cfs, size); cfs.forceFlush(); @@ -64,10 +70,10 @@ public class MeteredFlusher implements Runnable } } - if (flushingBytes + liveBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L) + if (flushingBytes + liveBytes <= totalMemtableBytesAllowed) return; - logger.info("estimated {} bytes used by all memtables pre-flush", liveBytes); + logger.info("estimated {} live and {} flushing bytes used by all memtables", liveBytes, flushingBytes); // sort memtables by size List sorted = new ArrayList(); @@ -89,14 +95,16 @@ public class MeteredFlusher implements Runnable // flush largest first until we get below our threshold. // although it looks like liveBytes + flushingBytes will stay a constant, it will not if flushes finish // while we loop, which is especially likely to happen if the flush queue fills up (so further forceFlush calls block) - while (true) + while (!sorted.isEmpty()) { flushingBytes = countFlushingBytes(); - if (liveBytes + flushingBytes <= DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L || sorted.isEmpty()) + if (liveBytes + flushingBytes <= totalMemtableBytesAllowed) break; ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1); long size = cfs.getTotalMemtableLiveSize(); + if (size == 0) + break; logger.info("flushing {} to free up {} bytes", cfs, size); liveBytes -= size; cfs.forceFlush(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java index 94f593e..48dbfac 100644 --- a/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java +++ b/src/java/org/apache/cassandra/db/commitlog/PeriodicCommitLogExecutorService.java @@ -33,7 +33,7 @@ class PeriodicCommitLogExecutorService implements ICommitLogExecutorService public PeriodicCommitLogExecutorService(final CommitLog commitLog) { - queue = new LinkedBlockingQueue(1024 * FBUtilities.getAvailableProcessors()); + queue = new LinkedBlockingQueue(DatabaseDescriptor.getCommitLogPeriodicQueueSize()); Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws Exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/dc95c8c0/src/java/org/apache/cassandra/utils/StatusLogger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java index cf5c5fa..15c4811 100644 --- a/src/java/org/apache/cassandra/utils/StatusLogger.java +++ b/src/java/org/apache/cassandra/utils/StatusLogger.java @@ -19,6 +19,8 @@ package org.apache.cassandra.utils; import java.lang.management.ManagementFactory; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import javax.management.JMX; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; @@ -36,7 +38,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.RowIndexEntry; +import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.CacheService; @@ -74,9 +78,10 @@ public class StatusLogger threadPoolProxy.getTotalBlockedTasks())); } // one offs - CompactionManager cm = CompactionManager.instance; logger.info(String.format("%-25s%10s%10s", - "CompactionManager", cm.getActiveCompactions(), cm.getPendingTasks())); + "CompactionManager", CompactionManager.instance.getActiveCompactions(), CompactionManager.instance.getPendingTasks())); + logger.info(String.format("%-25s%10s%10s", + "Commitlog", "n/a", CommitLog.instance.getPendingTasks())); int pendingCommands = 0; for (int n : MessagingService.instance().getCommandPendingTasks().values()) {