cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] git commit: Improve MeteredFlusher handling of MF-unaffected column families
Date Tue, 25 Mar 2014 19:03:14 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 0cd17695f -> 48847b5c0


Improve MeteredFlusher handling of MF-unaffected column families

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6867


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/58e524e7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/58e524e7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/58e524e7

Branch: refs/heads/cassandra-2.1
Commit: 58e524e77f218cc75d01acc8f41c8ee6eef86207
Parents: f8229c3
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Tue Mar 25 21:55:37 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Tue Mar 25 21:55:37 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/cassandra/db/MeteredFlusher.java | 164 ++++++++++---------
 .../compaction/AbstractCompactionStrategy.java  |  10 ++
 3 files changed, 96 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 373544c..00b98fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,8 @@
  * Static columns with IF NOT EXISTS don't always work as expected (CASSANDRA-6873)
  * Fix paging with SELECT DISTINCT (CASSANDRA-6857)
  * Fix UnsupportedOperationException on CAS timeout (CASSANDRA-6923)
+ * Improve MeteredFlusher handling of MF-unaffected column families
+   (CASSANDRA-6867)
 Merged from 1.2:
  * Add UNLOGGED, COUNTER options to BATCH documentation (CASSANDRA-6816)
  * add extra SSL cipher suites (CASSANDRA-6613)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/MeteredFlusher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MeteredFlusher.java b/src/java/org/apache/cassandra/db/MeteredFlusher.java
index 5c71fc6..4f06bc6 100644
--- a/src/java/org/apache/cassandra/db/MeteredFlusher.java
+++ b/src/java/org/apache/cassandra/db/MeteredFlusher.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
-import com.google.common.collect.Iterables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,97 +33,102 @@ public class MeteredFlusher implements Runnable
 
     public void run()
     {
-        long totalMemtableBytesAllowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() *
1048576L;
-
-        // first, find how much memory non-active memtables are using
-        ColumnFamilyStore measuredCfs = Memtable.activelyMeasuring;
-        long flushingBytes = measuredCfs == null ? 0 : measuredCfs.getMemtableThreadSafe().getLiveSize();
-        flushingBytes += countFlushingBytes();
-        if (flushingBytes > 0)
-            logger.debug("Currently flushing {} bytes of {} max", flushingBytes, totalMemtableBytesAllowed);
-
-        // next, flush CFs using more than 1 / (maximum number of memtables it could have
in the pipeline)
-        // of the total size allotted.  Then, flush other CFs in order of size if necessary.
-        long liveBytes = 0;
-        try
-        {
-            long totalMemtableBytesUnused = totalMemtableBytesAllowed - flushingBytes;
-            for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-            {
-                long size = cfs.getTotalMemtableLiveSize();
-                int maxInFlight = (int) Math.ceil((double) (1 // live memtable
-                                                            + 1 // potentially a flushed
memtable being counted by jamm
-                                                            + DatabaseDescriptor.getFlushWriters()
-                                                            + DatabaseDescriptor.getFlushQueueSize())
-                                                  / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
-                if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher() && totalMemtableBytesUnused
> 0 && size > totalMemtableBytesUnused / maxInFlight)
-                {
-                    logger.info("flushing high-traffic column family {} (estimated {} bytes)",
cfs, size);
-                    cfs.forceFlush();
-                }
-                else
-                {
-                    liveBytes += size;
-                }
-            }
+        long allowedSize = calculateAllowedSize();
 
-            if (flushingBytes + liveBytes <= totalMemtableBytesAllowed)
-                return;
+        // find how much memory non-active memtables are using
+        long flushingSize = calculateFlushingSize();
+        if (flushingSize > 0)
+            logger.debug("Currently flushing {} bytes of {} max", flushingSize, allowedSize);
 
-            logger.info("estimated {} live and {} flushing bytes used by all memtables",
liveBytes, flushingBytes);
+        List<ColumnFamilyStore> affectedCFs = affectedColumnFamilies();
+        long liveSize = 0;
 
-            // sort memtables by size
-            List<ColumnFamilyStore> sorted = new ArrayList<ColumnFamilyStore>();
-            Iterables.addAll(sorted, ColumnFamilyStore.all());
-            Collections.sort(sorted, new Comparator<ColumnFamilyStore>()
+        // flush CFs using more than 1 / (maximum number of memtables it could have in the
pipeline)
+        // of the total size allotted. Then, flush other CFs in order of size if necessary.
+        for (ColumnFamilyStore cfs : affectedCFs)
+        {
+            int maxInFlight = (int) Math.ceil((double) (1 // live memtable
+                                                        + 1 // potentially a flushed memtable
being counted by jamm
+                                                        + DatabaseDescriptor.getFlushWriters()
+                                                        + DatabaseDescriptor.getFlushQueueSize())
+                                              / (1 + cfs.indexManager.getIndexesBackedByCfs().size()));
+            long size = cfs.getTotalMemtableLiveSize();
+            if (allowedSize > flushingSize && size > (allowedSize - flushingSize)
/ maxInFlight)
             {
-                public int compare(ColumnFamilyStore o1, ColumnFamilyStore o2)
-                {
-                    long size1 = o1.getTotalMemtableLiveSize();
-                    long size2 = o2.getTotalMemtableLiveSize();
-                    if (size1 < size2)
-                        return -1;
-                    if (size1 > size2)
-                        return 1;
-                    return 0;
-                }
-            });
-
-            // flush largest first until we get below our threshold.
-            // although it looks like liveBytes + flushingBytes will stay a constant, it
will not if flushes finish
-            // while we loop, which is especially likely to happen if the flush queue fills
up (so further forceFlush calls block)
-            while (!sorted.isEmpty())
+                logger.info("flushing high-traffic column family {} (estimated {} bytes)",
cfs, size);
+                cfs.forceFlush();
+            }
+            else
             {
-                flushingBytes = countFlushingBytes();
-                if (liveBytes + flushingBytes <= totalMemtableBytesAllowed)
-                    break;
-
-                ColumnFamilyStore cfs = sorted.remove(sorted.size() - 1);
-                if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
-                {
-                    long size = cfs.getTotalMemtableLiveSize();
-                    if (size == 0)
-                        break;
-                    logger.info("flushing {} to free up {} bytes", cfs, size);
-                    liveBytes -= size;
-                    cfs.forceFlush();
-                }
+                liveSize += size;
             }
         }
-        finally
+
+        if (liveSize + flushingSize <= allowedSize)
+            return;
+        logger.info("estimated {} live and {} flushing bytes used by all memtables", liveSize,
flushingSize);
+
+        Collections.sort(affectedCFs, new Comparator<ColumnFamilyStore>()
         {
-            logger.trace("memtable memory usage is {} bytes with {} live", liveBytes + flushingBytes,
liveBytes);
+            public int compare(ColumnFamilyStore lhs, ColumnFamilyStore rhs)
+            {
+                return Long.compare(lhs.getTotalMemtableLiveSize(), rhs.getTotalMemtableLiveSize());
+            }
+        });
+
+        // flush largest first until we get below our threshold.
+        // although it looks like liveSize + flushingSize will stay a constant, it will not
if flushes finish
+        // while we loop, which is especially likely to happen if the flush queue fills up
(so further forceFlush calls block)
+        while (!affectedCFs.isEmpty())
+        {
+            flushingSize = calculateFlushingSize();
+            if (liveSize + flushingSize <= allowedSize)
+                break;
+
+            ColumnFamilyStore cfs = affectedCFs.remove(affectedCFs.size() - 1);
+            long size = cfs.getTotalMemtableLiveSize();
+            if (size > 0)
+            {
+                logger.info("flushing {} to free up {} bytes", cfs, size);
+                liveSize -= size;
+                cfs.forceFlush();
+            }
         }
+
+        logger.trace("memtable memory usage is {} bytes with {} live", liveSize + flushingSize,
liveSize);
     }
 
-    private long countFlushingBytes()
+    private static List<ColumnFamilyStore> affectedColumnFamilies()
     {
-        long flushingBytes = 0;
+        List<ColumnFamilyStore> affected = new ArrayList<>();
+        // filter out column families that aren't affected by MeteredFlusher
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-        {
-            for (Memtable memtable : cfs.getMemtablesPendingFlush())
-                flushingBytes += memtable.getLiveSize();
-        }
-        return flushingBytes;
+            if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+                affected.add(cfs);
+        return affected;
+    }
+
+    private static long calculateAllowedSize()
+    {
+        long allowed = DatabaseDescriptor.getTotalMemtableSpaceInMB() * 1048576L;
+        // deduct the combined memory limit of the tables unaffected by the metered flusher
(we don't flush them, we
+        // should not count their limits to the total limit either).
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+            if (!cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+                allowed -= cfs.getCompactionStrategy().getMemtableReservedSize();
+        return allowed;
+    }
+
+    private static long calculateFlushingSize()
+    {
+        ColumnFamilyStore measuredCFS = Memtable.activelyMeasuring;
+        long flushing = measuredCFS != null && measuredCFS.getCompactionStrategy().isAffectedByMeteredFlusher()
+                      ? measuredCFS.getMemtableThreadSafe().getLiveSize()
+                      : 0;
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+            if (cfs.getCompactionStrategy().isAffectedByMeteredFlusher())
+                for (Memtable memtable : cfs.getMemtablesPendingFlush())
+                    flushing += memtable.getLiveSize();
+        return flushing;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/58e524e7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 164cfda..5425683 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -201,6 +201,16 @@ public abstract class AbstractCompactionStrategy
     }
 
     /**
+     * If not affected by MeteredFlusher (and handling flushing on its own), override to
tell MF how much
+     * space to reserve for this CF, i.e., how much space to subtract from `memtable_total_space_in_mb`
when deciding
+     * if other memtables should be flushed or not.
+     */
+    public long getMemtableReservedSize()
+    {
+        return 0;
+    }
+
+    /**
      * Handle a flushed memtable.
      *
      * @param memtable the flushed memtable


Mime
View raw message