Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 f8229c3fb -> 58e524e77
Improve MeteredFlusher handling of MF-unaffected column families
patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6867
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58e524e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58e524e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58e524e7
Branch: refs/heads/cassandra-2.0
Commit: 58e524e77f218cc75d01acc8f41c8ee6eef86207
Parents: f8229c3
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Tue Mar 25 21:55:37 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Tue Mar 25 21:55:37 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/db/MeteredFlusher.java | 164 ++++++++++---------
.../compaction/AbstractCompactionStrategy.java | 10 ++
3 files changed, 96 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 373544c..00b98fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,8 @@
* Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873)
* Fix paging with SELECT DISTINCT (CASSANDRA-6857)
* Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
+ * Improve MeteredFlusher handling of MF-unaffected column families
+ (CASSANDRA-6867)
Merged from 1.2:
* Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
* add extra SSL cipher suites (CASSANDRA-6613)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 5c71fc6..4f06bc6 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -22,7 +22,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,97 +33,102 @@ public class MeteredFlusher implements Runnable
public void run()
{
- long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() *
1048576L;
-
- // first, find how much memory non-active memtables are using
- ColumnFamilyStore measuredCfs = Memtable.activelyMeasuring;
- long flushingBytes = measuredCfs == null ? 0 : measuredCfs.getMemtableThreadSafe().getLiveSize();
- flushingBytes += countFlushingBytes();
- if (flushingBytes > 0)
- logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
-
- // next, flush CFs using more than 1 / (maximum number of memtables it could have
in the pipeline)
- // of the total size allotted. Then, flush other CFs in order of size if necessary.
- long liveBytes = 0;
- try
- {
- long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
- for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- long size = cfs.getTotalMemtableLiveSize();
- int maxInFlight = (int) Math.ceil((double) (1 // live memtable
- + 1 // potentially a flushed
memtable being counted by jamm
- + DatabaseDescriptor.getFlushWriters()
- + DatabaseDescriptor.getFlushQueueSize())
- / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
- if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher() && totalMemtableBytesUnused
> 0 && size > totalMemtableBytesUnused / maxInFlight)
- {
- logger.info("flushing high-traffic column family {} (estimated {} bytes)",
cfs, size);
- cfs.forceFlush();
- }
- else
- {
- liveBytes += size;
- }
- }
+ long allowedSize = calculateAllowedSize();
- if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
- return;
+ // find how much memory non-active memtables are using
+ long flushingSize = calculateFlushingSize();
+ if (flushingSize > 0)
+ logger.debug("Currently flushing {} bytes of {} max", flushingSize, allowedSize);
- logger.info("estimated {} live and {} flushing bytes used by all memtables",
liveBytes, flushingBytes);
+ List<ColumnFamilyStore> affectedCFs = affectedColumnFamilies();
+ long liveSize = 0;
- // sort memtables by size
- List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
- Iterables.addAll(sorted, ColumnFamilyStore.all());
- Collections.sort(sorted, new Comparator<ColumnFamilyStore>()
+ // flush CFs using more than 1 / (maximum number of memtables it could have in the
pipeline)
+ // of the total size allotted. Then, flush other CFs in order of size if necessary.
+ for (ColumnFamilyStore cfs : affectedCFs)
+ {
+ int maxInFlight = (int) Math.ceil((double) (1 // live memtable
+ + 1 // potentially a flushed memtable
being counted by jamm
+ + DatabaseDescriptor.getFlushWriters()
+ + DatabaseDescriptor.getFlushQueueSize())
+ / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
+ long size = cfs.getTotalMemtableLiveSize();
+ if (allowedSize > flushingSize && size > (allowedSize - flushingSize)
/ maxInFlight)
{
- public int compare(ColumnFamilyStore o1, ColumnFamilyStore o2)
- {
- long size1 = o1.getTotalMemtableLiveSize();
- long size2 = o2.getTotalMemtableLiveSize();
- if (size1 < size2)
- return -1;
- if (size1 > size2)
- return 1;
- return 0;
- }
- });
-
- // flush largest first until we get below our threshold.
- // although it looks like liveBytes + flushingBytes will stay a constant, it
will not if flushes finish
- // while we loop, which is especially likely to happen if the flush queue fills
up (so further forceFlush calls block)
- while (!sorted.isEmpty())
+ logger.info("flushing high-traffic column family {} (estimated {} bytes)",
cfs, size);
+ cfs.forceFlush();
+ }
+ else
{
- flushingBytes = countFlushingBytes();
- if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
- break;
-
- ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
- if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
- {
- long size = cfs.getTotalMemtableLiveSize();
- if (size == 0)
- break;
- logger.info("flushing {} to free up {} bytes", cfs, size);
- liveBytes -= size;
- cfs.forceFlush();
- }
+ liveSize += size;
}
}
- finally
+
+ if (liveSize + flushingSize <= allowedSize)
+ return;
+ logger.info("estimated {} live and {} flushing bytes used by all memtables", liveSize,
flushingSize);
+
+ Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>()
{
- logger.trace("memtable memory usage is {} bytes with {} live", liveBytes + flushingBytes,
liveBytes);
+ public int compare(ColumnFamilyStore lhs, ColumnFamilyStore rhs)
+ {
+ return Long.compare(lhs.getTotalMemtableLiveSize(), rhs.getTotalMemtableLiveSize());
+ }
+ });
+
+ // flush largest first until we get below our threshold.
+ // although it looks like liveSize + flushingSize will stay a constant, it will not
if flushes finish
+ // while we loop, which is especially likely to happen if the flush queue fills up
(so further forceFlush calls block)
+ while (!affectedCFs.isEmpty())
+ {
+ flushingSize = calculateFlushingSize();
+ if (liveSize + flushingSize <= allowedSize)
+ break;
+
+ ColumnFamilyStore cfs = affectedCFs.remove(affectedCFs.size() - 1);
+ long size = cfs.getTotalMemtableLiveSize();
+ if (size > 0)
+ {
+ logger.info("flushing {} to free up {} bytes", cfs, size);
+ liveSize -= size;
+ cfs.forceFlush();
+ }
}
+
+ logger.trace("memtable memory usage is {} bytes with {} live", liveSize + flushingSize,
liveSize);
}
- private long countFlushingBytes()
+ private static List<ColumnFamilyStore> affectedColumnFamilies()
{
- long flushingBytes = 0;
+ List<ColumnFamilyStore> affected = new ArrayList<>();
+ // filter out column families that aren't affected by MeteredFlusher
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
- {
- for (Memtable memtable : cfs.getMemtablesPendingFlush())
- flushingBytes += memtable.getLiveSize();
- }
- return flushingBytes;
+ if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+ affected.add(cfs);
+ return affected;
+ }
+
+ private static long calculateAllowedSize()
+ {
+ long allowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+ // deduct the combined memory limit of the tables unaffected by the metered flusher
(we don't flush them, we
+ // should not count their limits to the total limit either).
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ if (!cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+ allowed -= cfs.getCompactionStrategy().getMemtableReservedSize();
+ return allowed;
+ }
+
+ private static long calculateFlushingSize()
+ {
+ ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring;
+ long flushing = measuredCFS != null && measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher()
+ ? measuredCFS.getMemtableThreadSafe().getLiveSize()
+ : 0;
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+ for (Memtable memtable : cfs.getMemtablesPendingFlush())
+ flushing += memtable.getLiveSize();
+ return flushing;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 164cfda..5425683 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -201,6 +201,16 @@ public abstract class AbstractCompactionStrategy
}
/**
+ * If not affected by MeteredFlusher (and handling flushing on its own), override to
tell MF how much
+ * space to reserve for this CF, i.e., how much space to subtract from `memtable_total_space_in_mb`
when deciding
+ * if other memtables should be flushed or not.
+ */
+ public long getMemtableReservedSize()
+ {
+ return 0;
+ }
+
+ /**
* Handle a flushed memtable.
*
* @param memtable the flushed memtable
|