cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobr...@apache.org
Subject cassandra git commit: Compressed commit log should measure compressed space used
Date Fri, 05 Jun 2015 12:40:21 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 76cbddebc -> 675a0f471


Compressed commit log should measure compressed space used

patch by blambov; reviewed by jasobrown from CASSANDRA-9095


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/675a0f47
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/675a0f47
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/675a0f47

Branch: refs/heads/cassandra-2.2
Commit: 675a0f4713f3b25edcb6b8108ed4dc879fa0ec53
Parents: 76cbdde
Author: Jason Brown <jasedbrown@gmail.com>
Authored: Fri Jun 5 05:39:29 2015 -0700
Committer: Jason Brown <jasedbrown@gmail.com>
Committed: Fri Jun 5 05:39:29 2015 -0700

----------------------------------------------------------------------
 conf/cassandra.yaml                             |  2 +-
 .../cassandra/db/commitlog/CommitLog.java       | 26 +++++++-
 .../cassandra/db/commitlog/CommitLogMBean.java  | 18 ++++-
 .../db/commitlog/CommitLogSegment.java          | 16 ++++-
 .../db/commitlog/CommitLogSegmentManager.java   | 53 +++++++++------
 .../db/commitlog/CompressedSegment.java         | 12 ++++
 .../db/commitlog/MemoryMappedSegment.java       |  7 ++
 .../cassandra/metrics/CommitLogMetrics.java     |  2 +-
 .../db/commitlog/CommitLogStressTest.java       | 70 ++++++++++++++++----
 9 files changed, 166 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 2178907..ea22e01 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -355,7 +355,7 @@ concurrent_counter_writes: 32
 #   offheap_objects: native memory, eliminating nio buffer heap overhead
 memtable_allocation_type: heap_buffers
 
-# Total uncompressed size of the commit log.
+# Total space to use for commit logs on disk.
 #
 # If space gets above this value, Cassandra will flush every dirty CF
 # in the oldest segment and remove it.  So a small total commitlog space

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
index a81145d..b3f944d 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -29,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.commons.lang3.StringUtils;
 
 import com.github.tjake.ICRC32;
@@ -366,6 +367,30 @@ public class CommitLog implements CommitLogMBean
         return new ArrayList<>(archiver.archivePending.keySet());
     }
 
+    @Override
+    public long getActiveContentSize()
+    {
+        long size = 0;
+        for (CommitLogSegment segment : allocator.getActiveSegments())
+            size += segment.contentSize();
+        return size;
+    }
+
+    @Override
+    public long getActiveOnDiskSize()
+    {
+        return allocator.onDiskSize();
+    }
+
+    @Override
+    public Map<String, Double> getActiveSegmentCompressionRatios()
+    {
+        Map<String, Double> segmentRatios = new TreeMap<>();
+        for (CommitLogSegment segment : allocator.getActiveSegments())
+            segmentRatios.put(segment.getName(), 1.0 * segment.onDiskSize() / segment.contentSize());
+        return segmentRatios;
+    }
+
     /**
      * Shuts down the threads used by the commit log, blocking until completion.
      */
@@ -445,5 +470,4 @@ public class CommitLog implements CommitLogMBean
                 throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy());
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
index ecb1980..3b20bbc 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogMBean.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.commitlog;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 public interface CommitLogMBean
 {
@@ -63,9 +64,24 @@ public interface CommitLogMBean
      * @return file names (not full paths) of active commit log segments (segments containing
unflushed data)
      */
     public List<String> getActiveSegmentNames();
-    
+
     /**
      * @return Files which are pending for archival attempt.  Does NOT include failed archive
attempts.
      */
     public List<String> getArchivingSegmentNames();
+
+    /**
+     * @return The size of the mutations in all active commit log segments (uncompressed).
+     */
+    public long getActiveContentSize();
+
+    /**
+     * @return The space taken on disk by the commit log (compressed).
+     */
+    public long getActiveOnDiskSize();
+
+    /**
+     * @return A map between active log segments and the compression ratio achieved for each.
+     */
+    public Map<String, Double> getActiveSegmentCompressionRatios();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index ee160c3..d748006 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -113,6 +113,7 @@ public abstract class CommitLogSegment
 
     ByteBuffer buffer;
 
+    final CommitLog commitLog;
     public final CommitLogDescriptor descriptor;
 
     static CommitLogSegment createSegment(CommitLog commitLog)
@@ -132,6 +133,7 @@ public abstract class CommitLogSegment
      */
     CommitLogSegment(CommitLog commitLog)
     {
+        this.commitLog = commitLog;
         id = getNextId();
         descriptor = new CommitLogDescriptor(id, commitLog.compressorClass);
         logFile = new File(commitLog.location, descriptor.fileName());
@@ -305,9 +307,12 @@ public abstract class CommitLogSegment
     /**
      * Completely discards a segment file by deleting it. (Potentially blocking operation)
      */
-    void delete()
+    void discard(boolean deleteFile)
     {
-       FileUtils.deleteWithConfirm(logFile);
+        close();
+        if (deleteFile)
+            FileUtils.deleteWithConfirm(logFile);
+        commitLog.allocator.addSize(-onDiskSize());
     }
 
     /**
@@ -525,6 +530,13 @@ public abstract class CommitLogSegment
         return sb.toString();
     }
 
+    abstract public long onDiskSize();
+
+    public long contentSize()
+    {
+        return lastSyncedOffset;
+    }
+
     @Override
     public String toString()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
index 5301e1b..3f00e97 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -119,7 +119,6 @@ public class CommitLogSegmentManager
                             if (availableSegments.isEmpty() && (activeSegments.isEmpty()
|| createReserveSegments))
                             {
                                 logger.debug("No segments in reserve; creating a fresh one");
-                                size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize());
                                 // TODO : some error handling in case we fail to create a
new segment
                                 availableSegments.add(CommitLogSegment.createSegment(commitLog));
                                 hasAvailableSegments.signalAll();
@@ -368,27 +367,41 @@ public class CommitLogSegmentManager
     private void discardSegment(final CommitLogSegment segment, final boolean deleteFile)
     {
         logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile
? "now" : "by the archive script");
-        size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize());
 
         segmentManagementTasks.add(new Runnable()
         {
             public void run()
             {
-                segment.close();
-                if (deleteFile)
-                    segment.delete();
+                segment.discard(deleteFile);
             }
         });
     }
 
     /**
+     * Adjust the tracked on-disk size. Called by individual segments to reflect writes,
allocations and discards.
+     * @param addedSize
+     */
+    void addSize(long addedSize)
+    {
+        size.addAndGet(addedSize);
+    }
+
+    /**
      * @return the space (in bytes) used by all segment files.
      */
-    public long bytesUsed()
+    public long onDiskSize()
     {
         return size.get();
     }
 
+    private long unusedCapacity()
+    {
+        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
+        long currentSize = size.get();
+        logger.debug("Total active commitlog segment space used is {} out of {}", currentSize,
total);
+        return total - currentSize;
+    }
+
     /**
      * @param name the filename to check
      * @return true if file is managed by this manager.
@@ -401,14 +414,6 @@ public class CommitLogSegmentManager
         return false;
     }
 
-    private long unusedCapacity()
-    {
-        long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024;
-        long currentSize = size.get();
-        logger.debug("Total active commitlog segment space used is {} out of {}", currentSize,
total);
-        return total - currentSize;
-    }
-
     /**
      * Throws a flag that enables the behavior of keeping at least one spare segment
      * available at all times.
@@ -468,8 +473,7 @@ public class CommitLogSegmentManager
         logger.debug("CLSM closing and clearing existing commit log segments...");
         createReserveSegments = false;
 
-        while (!segmentManagementTasks.isEmpty())
-            Thread.yield();
+        awaitManagementTasksCompletion();
 
         shutdown();
         try
@@ -498,15 +502,22 @@ public class CommitLogSegmentManager
         logger.debug("CLSM done with closing and clearing existing commit log segments.");
     }
 
-    private static void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+    // Used by tests only.
+    void awaitManagementTasksCompletion()
     {
-        segment.close();
-        if (!delete)
-            return;
+        while (!segmentManagementTasks.isEmpty())
+            Thread.yield();
+        // The last management task is not yet complete. Wait a while for it.
+        Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        // TODO: If this functionality is required by anything other than tests, signalling
must be used to ensure
+        // waiting completes correctly.
+    }
 
+    private static void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete)
+    {
         try
         {
-            segment.delete();
+            segment.discard(delete);
         }
         catch (AssertionError ignored)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
index c8101e4..8c62536 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java
@@ -53,6 +53,8 @@ public class CompressedSegment extends CommitLogSegment
     static final int COMPRESSED_MARKER_SIZE = SYNC_MARKER_SIZE + 4;
     final ICompressor compressor;
 
+    volatile long lastWrittenPos = 0;
+
     /**
      * Constructs a new segment file.
      */
@@ -63,6 +65,7 @@ public class CompressedSegment extends CommitLogSegment
         try
         {
             channel.write((ByteBuffer) buffer.duplicate().flip());
+            commitLog.allocator.addSize(lastWrittenPos = buffer.position());
         }
         catch (IOException e)
         {
@@ -120,7 +123,10 @@ public class CompressedSegment extends CommitLogSegment
             // Only one thread can be here at a given time.
             // Protected by synchronization on CommitLogSegment.sync().
             writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position()
+ compressedBuffer.remaining());
+            commitLog.allocator.addSize(compressedBuffer.limit());
             channel.write(compressedBuffer);
+            assert channel.position() - lastWrittenPos == compressedBuffer.limit();
+            lastWrittenPos = channel.position();
             SyncUtil.force(channel, true);
         }
         catch (Exception e)
@@ -144,4 +150,10 @@ public class CompressedSegment extends CommitLogSegment
     {
         bufferPool.clear();
     }
+
+    @Override
+    public long onDiskSize()
+    {
+        return lastWrittenPos;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
index 7e74ec6..e240a91 100644
--- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java
@@ -67,6 +67,7 @@ public class MemoryMappedSegment extends CommitLogSegment
             {
                 throw new FSWriteError(e, logFile);
             }
+            commitLog.allocator.addSize(DatabaseDescriptor.getCommitLogSegmentSize());
 
             return channel.map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize());
         }
@@ -102,6 +103,12 @@ public class MemoryMappedSegment extends CommitLogSegment
     }
 
     @Override
+    public long onDiskSize()
+    {
+        return DatabaseDescriptor.getCommitLogSegmentSize();
+    }
+
+    @Override
     protected void internalClose()
     {
         if (FileUtils.isCleanerAvailable())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
index 7a9f6e5..1da6ed0 100644
--- a/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/CommitLogMetrics.java
@@ -69,7 +69,7 @@ public class CommitLogMetrics
         {
             public Long getValue()
             {
-                return allocator.bytesUsed();
+                return allocator.onDiskSize();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/675a0f47/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index 7f9df9e..ba8896f 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -29,7 +29,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -41,9 +43,9 @@ import junit.framework.Assert;
 
 import com.google.common.util.concurrent.RateLimiter;
 
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.Config.CommitLogSync;
@@ -138,7 +140,11 @@ public class CommitLogStressTest
 
         SchemaLoader.loadSchema();
         SchemaLoader.schemaDefinition(""); // leave def. blank to maintain old behaviour
-
+    }
+    
+    @Before
+    public void cleanDir()
+    {
         File dir = new File(location);
         if (dir.isDirectory())
         {
@@ -201,7 +207,7 @@ public class CommitLogStressTest
     }
 
     public void testLog(CommitLog commitLog) throws IOException, InterruptedException {
-        System.out.format("\nTesting commit log size %dmb, compressor %s, sync %s%s%s\n",
+        System.out.format("\nTesting commit log size %.0fmb, compressor %s, sync %s%s%s\n",
                            mb(DatabaseDescriptor.getCommitLogSegmentSize()),
                            commitLog.compressor != null ? commitLog.compressor.getClass().getSimpleName()
: "none",
                            commitLog.executor.getClass().getSimpleName(),
@@ -223,13 +229,16 @@ public class CommitLogStressTest
             for (CommitlogExecutor t: threads)
             {
                 t.join();
-                CommitLog.instance.discardCompletedSegments( Schema.instance.getCFMetaData("Keyspace1",
"Standard1").cfId, t.rp);
                 if (t.rp.compareTo(discardedPos) > 0)
                     discardedPos = t.rp;
             }
+            verifySizes(commitLog);
+
+            commitLog.discardCompletedSegments(Schema.instance.getCFMetaData("Keyspace1",
"Standard1").cfId, discardedPos);
             threads.clear();
             System.out.format("Discarded at %s\n", discardedPos);
-
+            verifySizes(commitLog);
+            
             scheduled = startThreads(commitLog, threads);
         }
 
@@ -246,6 +255,7 @@ public class CommitLogStressTest
             hash += t.hash;
             cells += t.cells;
         }
+        verifySizes(commitLog);
         
         commitLog.shutdownBlocking();
 
@@ -267,7 +277,39 @@ public class CommitLogStressTest
         }
     }
 
-    public ScheduledExecutorService startThreads(CommitLog commitLog, final List<CommitlogExecutor>
threads)
+    private void verifySizes(CommitLog commitLog)
+    {
+        // Complete anything that's still left to write.
+        commitLog.executor.requestExtraSync().awaitUninterruptibly();
+        // One await() does not suffice as we may be signalled when an ongoing sync finished.
Request another
+        // (which shouldn't write anything) to make sure the first we triggered completes.
+        // FIXME: The executor should give us a chance to await completion of the sync we
requested.
+        commitLog.executor.requestExtraSync().awaitUninterruptibly();
+        // Wait for any pending deletes or segment allocations to complete.
+        commitLog.allocator.awaitManagementTasksCompletion();
+        
+        long combinedSize = 0;
+        for (File f : new File(commitLog.location).listFiles())
+            combinedSize += f.length();
+        Assert.assertEquals(combinedSize, commitLog.getActiveOnDiskSize());
+
+        List<String> logFileNames = commitLog.getActiveSegmentNames();
+        Map<String, Double> ratios = commitLog.getActiveSegmentCompressionRatios();
+        Collection<CommitLogSegment> segments = commitLog.allocator.getActiveSegments();
+
+        for (CommitLogSegment segment: segments)
+        {
+            Assert.assertTrue(logFileNames.remove(segment.getName()));
+            Double ratio = ratios.remove(segment.getName());
+            
+            Assert.assertEquals(segment.logFile.length(), segment.onDiskSize());
+            Assert.assertEquals(segment.onDiskSize() * 1.0 / segment.contentSize(), ratio,
0.01);
+        }
+        Assert.assertTrue(logFileNames.isEmpty());
+        Assert.assertTrue(ratios.isEmpty());
+    }
+
+    public ScheduledExecutorService startThreads(final CommitLog commitLog, final List<CommitlogExecutor>
threads)
     {
         stop = false;
         for (int ii = 0; ii < NUM_THREADS; ii++) {
@@ -282,9 +324,9 @@ public class CommitLogStressTest
 
             public void run() {
               Runtime runtime = Runtime.getRuntime();
-              long maxMemory = mb(runtime.maxMemory());
-              long allocatedMemory = mb(runtime.totalMemory());
-              long freeMemory = mb(runtime.freeMemory());
+              long maxMemory = runtime.maxMemory();
+              long allocatedMemory = runtime.totalMemory();
+              long freeMemory = runtime.freeMemory();
               long temp = 0;
               long sz = 0;
               for (CommitlogExecutor cle : threads) {
@@ -293,9 +335,11 @@ public class CommitLogStressTest
               }
               double time = (System.currentTimeMillis() - start) / 1000.0;
               double avg = (temp / time);
-              System.out.println(String.format("second %d mem max %dmb allocated %dmb free
%dmb mutations %d since start %d avg %.3f transfer %.3fmb",
+              System.out.println(
+                      String.format("second %d mem max %.0fmb allocated %.0fmb free %.0fmb
mutations %d since start %d avg %.3f content %.1fmb ondisk %.1fmb transfer %.3fmb",
                       ((System.currentTimeMillis() - start) / 1000),
-                      maxMemory, allocatedMemory, freeMemory, (temp - lastUpdate), lastUpdate,
avg, mb(sz / time)));
+                      mb(maxMemory), mb(allocatedMemory), mb(freeMemory), (temp - lastUpdate),
lastUpdate, avg,
+                      mb(commitLog.getActiveContentSize()), mb(commitLog.getActiveOnDiskSize()),
mb(sz / time)));
               lastUpdate = temp;
             }
         };
@@ -304,8 +348,8 @@ public class CommitLogStressTest
         return scheduled;
     }
 
-    private static long mb(long maxMemory) {
-        return maxMemory / (1024 * 1024);
+    private static double mb(long maxMemory) {
+        return maxMemory / (1024.0 * 1024);
     }
 
     private static double mb(double maxMemory) {


Mime
View raw message