cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1203729 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/compaction/ test/unit/org/apache/cassandra/db/compaction/
Date Fri, 18 Nov 2011 16:37:39 GMT
Author: jbellis
Date: Fri Nov 18 16:37:39 2011
New Revision: 1203729

URL: http://svn.apache.org/viewvc?rev=1203729&view=rev
Log:
update size-tiered compaction to prioritize small tiers
patch by jbellis; reviewed by slebresne for CASSANDRA-2407

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Nov 18 16:37:39 2011
@@ -1,4 +1,5 @@
 1.1-dev
+ * update size-tiered compaction to prioritize small tiers (CASSANDRA-2407)
  * add message expiration logic to OutboundTcpConnection (CASSANDRA-3005)
  * off-heap cache to use sun.misc.Unsafe instead of JNA (CASSANDRA-3271)
  * EACH_QUORUM is only supported for writes (CASSANDRA-3272)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Nov 18 16:37:39
2011
@@ -1036,6 +1036,11 @@ public class ColumnFamilyStore implement
         return data.getSSTables();
     }
 
+    public Set<SSTableReader> getUncompactingSSTables()
+    {
+        return data.getUncompactingSSTables();
+    }
+
     public long[] getRecentSSTablesPerReadHistogram()
     {
         return recentSSTablesPerRead.getBuckets(true);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/DataTracker.java Fri Nov 18 16:37:39
2011
@@ -80,6 +80,11 @@ public class DataTracker
         return view.get().sstables;
     }
 
+    public Set<SSTableReader> getUncompactingSSTables()
+    {
+        return view.get().nonCompactingSStables();
+    }
+
     public View getView()
     {
         return view.get();
@@ -276,7 +281,7 @@ public class DataTracker
         do
         {
             currentView = view.get();
-            notCompacting = Sets.difference(ImmutableSet.copyOf(currentView.sstables), currentView.compacting);
+            notCompacting = currentView.nonCompactingSStables();
             newView = currentView.replace(notCompacting, Collections.<SSTableReader>emptySet());
         }
         while (!view.compareAndSet(currentView, newView));
@@ -576,6 +581,11 @@ public class DataTracker
             this.intervalTree = intervalTree;
         }
 
+        public Sets.SetView<SSTableReader> nonCompactingSStables()
+        {
+            return Sets.difference(ImmutableSet.copyOf(sstables), compacting);
+        }
+
         private IntervalTree buildIntervalTree(List<SSTableReader> sstables)
         {
             List<Interval> intervals = new ArrayList<Interval>(sstables.size());

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
Fri Nov 18 16:37:39 2011
@@ -75,18 +75,17 @@ public abstract class AbstractCompaction
     public void shutdown() { }
 
     /**
-     * @return a list of compaction tasks that should run in the background to get the sstable
-     * count down to desired parameters. Will not be null, but may be empty.
+     * @return the next background/minor compaction task to run; null if nothing to do.
      * @param gcBefore throw away tombstones older than this
      */
-    public abstract List<AbstractCompactionTask> getBackgroundTasks(final int gcBefore);
+    public abstract AbstractCompactionTask getNextBackgroundTask(final int gcBefore);
 
     /**
-     * @return a list of compaction tasks that should be run to compact this columnfamilystore
-     * as much as possible.  Will not be null, but may be empty.
+     * @return a compaction task that should be run to compact this columnfamilystore
+     * as much as possible.  Null if nothing to do.
      * @param gcBefore throw away tombstones older than this
      */
-    public abstract List<AbstractCompactionTask> getMaximalTasks(final int gcBefore);
+    public abstract AbstractCompactionTask getMaximalTask(final int gcBefore);
 
     /**
      * @return a compaction task corresponding to the requested sstables.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Fri
Nov 18 16:37:39 2011
@@ -107,46 +107,37 @@ public class CompactionManager implement
      * It's okay to over-call (within reason) since the compactions are single-threaded,
      * and if a call is unnecessary, it will just be no-oped in the bucketing phase.
      */
-    public Future<Integer> submitBackground(final ColumnFamilyStore cfs)
+    public Future<?> submitBackground(final ColumnFamilyStore cfs)
     {
-        Callable<Integer> callable = new Callable<Integer>()
+        Runnable runnable = new WrappedRunnable()
         {
-            public Integer call() throws IOException
+            protected void runMayThrow() throws IOException
             {
                 compactionLock.readLock().lock();
                 try
                 {
-                    boolean taskExecuted = false;
                     AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-                    List<AbstractCompactionTask> tasks = strategy.getBackgroundTasks(getDefaultGcBefore(cfs));
-                    for (AbstractCompactionTask task : tasks)
-                    {
-                        if (!task.markSSTablesForCompaction())
-                            continue;
+                    AbstractCompactionTask task = strategy.getNextBackgroundTask(getDefaultGcBefore(cfs));
+                    if (task == null || !task.markSSTablesForCompaction())
+                        return;
 
-                        taskExecuted = true;
-                        try
-                        {
-                            task.execute(executor);
-                        }
-                        finally
-                        {
-                            task.unmarkSSTables();
-                        }
+                    try
+                    {
+                        task.execute(executor);
                     }
-
-                    // newly created sstables might have made other compactions eligible
-                    if (taskExecuted)
-                        submitBackground(cfs);
+                    finally
+                    {
+                        task.unmarkSSTables();
+                    }
+                    submitBackground(cfs);
                 }
                 finally 
                 {
                     compactionLock.readLock().unlock();
                 }
-                return 0;
             }
         };
-        return executor.submit(callable);
+        return executor.submit(runnable);
     }
 
     private static interface AllSSTablesOperation
@@ -242,40 +233,39 @@ public class CompactionManager implement
         submitMaximal(cfStore, getDefaultGcBefore(cfStore)).get();
     }
 
-    public Future<Object> submitMaximal(final ColumnFamilyStore cfStore, final int
gcBefore)
+    public Future<?> submitMaximal(final ColumnFamilyStore cfStore, final int gcBefore)
     {
-        Callable<Object> callable = new Callable<Object>()
+        Runnable runnable = new WrappedRunnable()
         {
-            public Object call() throws IOException
+            protected void runMayThrow() throws IOException
             {
                 // acquire the write lock long enough to schedule all sstables
                 compactionLock.writeLock().lock();
                 try
                 {
-                    AbstractCompactionStrategy strategy = cfStore.getCompactionStrategy();
-                    for (AbstractCompactionTask task : strategy.getMaximalTasks(gcBefore))
+                    AbstractCompactionTask task = cfStore.getCompactionStrategy().getMaximalTask(gcBefore);
+                    if (task == null)
+                        return;
+                    if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
+                        return;
+                    try
                     {
-                        if (!task.markSSTablesForCompaction(0, Integer.MAX_VALUE))
-                            return this;
+                        // downgrade the lock acquisition
+                        compactionLock.readLock().lock();
+                        compactionLock.writeLock().unlock();
                         try
                         {
-                            // downgrade the lock acquisition
-                            compactionLock.readLock().lock();
-                            compactionLock.writeLock().unlock();
-                            try
-                            {
-                                return task.execute(executor);
-                            }
-                            finally
-                            {
-                                compactionLock.readLock().unlock();
-                            }
+                            task.execute(executor);
                         }
                         finally
                         {
-                            task.unmarkSSTables();
+                            compactionLock.readLock().unlock();
                         }
                     }
+                    finally
+                    {
+                        task.unmarkSSTables();
+                    }
                 }
                 finally
                 {
@@ -283,10 +273,9 @@ public class CompactionManager implement
                     if (compactionLock.writeLock().isHeldByCurrentThread())
                         compactionLock.writeLock().unlock();
                 }
-                return this;
             }
         };
-        return executor.submit(callable);
+        return executor.submit(runnable);
     }
 
     public void forceUserDefinedCompaction(String ksname, String dataFiles)
@@ -322,11 +311,11 @@ public class CompactionManager implement
         submitUserDefined(cfs, descriptors, getDefaultGcBefore(cfs));
     }
 
-    public Future<Object> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor>
dataFiles, final int gcBefore)
+    public Future<?> submitUserDefined(final ColumnFamilyStore cfs, final Collection<Descriptor>
dataFiles, final int gcBefore)
     {
-        Callable<Object> callable = new Callable<Object>()
+        Runnable runnable = new WrappedRunnable()
         {
-            public Object call() throws IOException
+            protected void runMayThrow() throws IOException
             {
                 compactionLock.readLock().lock();
                 try
@@ -379,8 +368,6 @@ public class CompactionManager implement
                     {
                         SSTableReader.releaseReferences(sstables);
                     }
-
-                    return this;
                 }
                 finally
                 {
@@ -388,7 +375,7 @@ public class CompactionManager implement
                 }
             }
         };
-        return executor.submit(callable);
+        return executor.submit(runnable);
     }
 
     // This acquire a reference on the sstable

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
Fri Nov 18 16:37:39 2011
@@ -22,7 +22,6 @@ package org.apache.cassandra.db.compacti
 
 
 import java.util.*;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableSet;
@@ -37,7 +36,6 @@ import org.apache.cassandra.notification
 import org.apache.cassandra.notifications.INotificationConsumer;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
 import org.apache.cassandra.notifications.SSTableListChangedNotification;
-import org.apache.cassandra.service.StorageService;
 
 public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
 {
@@ -91,31 +89,31 @@ public class LeveledCompactionStrategy e
         return manifest.getLevelSize(i);
     }
 
-    public List<AbstractCompactionTask> getBackgroundTasks(int gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
     {
         LeveledCompactionTask currentTask = task.get();
         if (currentTask != null && !currentTask.isDone())
         {
             logger.debug("Compaction still in progress for {}", this);
-            return Collections.emptyList();
+            return null;
         }
 
         Collection<SSTableReader> sstables = manifest.getCompactionCandidates();
         if (sstables.isEmpty())
         {
             logger.debug("No compaction necessary for {}", this);
-            return Collections.emptyList();
+            return null;
         }
 
         LeveledCompactionTask newTask = new LeveledCompactionTask(cfs, sstables, gcBefore,
this.maxSSTableSizeInMB);
         return task.compareAndSet(currentTask, newTask)
-               ? Collections.<AbstractCompactionTask>singletonList(newTask)
-               : Collections.<AbstractCompactionTask>emptyList();
+               ? newTask
+               : null;
     }
 
-    public List<AbstractCompactionTask> getMaximalTasks(int gcBefore)
+    public AbstractCompactionTask getMaximalTask(int gcBefore)
     {
-        return getBackgroundTasks(gcBefore);
+        return getNextBackgroundTask(gcBefore);
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables,
int gcBefore)

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
Fri Nov 18 16:37:39 2011
@@ -46,17 +46,18 @@ public class SizeTieredCompactionStrateg
        minSSTableSize = (null != optionValue) ? Long.parseLong(optionValue) : DEFAULT_MIN_SSTABLE_SIZE;
     }
 
-    public List<AbstractCompactionTask> getBackgroundTasks(final int gcBefore)
+    public AbstractCompactionTask getNextBackgroundTask(final int gcBefore)
     {
         if (cfs.isCompactionDisabled())
         {
             logger.debug("Compaction is currently disabled.");
-            return Collections.<AbstractCompactionTask>emptyList();
+            return null;
         }
 
-        List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>();
-        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(cfs.getSSTables()),
minSSTableSize);
+        List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(cfs.getUncompactingSSTables()),
minSSTableSize);
+        updateEstimatedCompactionsByTasks(buckets);
 
+        List<List<SSTableReader>> prunedBuckets = new ArrayList<List<SSTableReader>>();
         for (List<SSTableReader> bucket : buckets)
         {
             if (bucket.size() < cfs.getMinimumCompactionThreshold())
@@ -69,19 +70,38 @@ public class SizeTieredCompactionStrateg
                     return o1.descriptor.generation - o2.descriptor.generation;
                 }
             });
-            tasks.add(new CompactionTask(cfs, bucket.subList(0, Math.min(bucket.size(), cfs.getMaximumCompactionThreshold())),
gcBefore));
+            prunedBuckets.add(bucket.subList(0, Math.min(bucket.size(), cfs.getMaximumCompactionThreshold())));
         }
 
-        updateEstimatedCompactionsByTasks(tasks);
-        return tasks;
+        if (prunedBuckets.isEmpty())
+            return null;
+
+        List<SSTableReader> smallestBucket = Collections.min(prunedBuckets, new Comparator<List<SSTableReader>>()
+        {
+            public int compare(List<SSTableReader> o1, List<SSTableReader> o2)
+            {
+                long n = avgSize(o1) - avgSize(o2);
+                if (n < 0)
+                    return -1;
+                if (n > 0)
+                    return 1;
+                return 0;
+            }
+
+            private long avgSize(List<SSTableReader> sstables)
+            {
+                long n = 0;
+                for (SSTableReader sstable : sstables)
+                    n += sstable.bytesOnDisk();
+                return n / sstables.size();
+            }
+        });
+        return new CompactionTask(cfs, smallestBucket, gcBefore);
     }
 
-    public List<AbstractCompactionTask> getMaximalTasks(final int gcBefore)
+    public AbstractCompactionTask getMaximalTask(final int gcBefore)
     {
-        List<AbstractCompactionTask> tasks = new LinkedList<AbstractCompactionTask>();
-        if (!cfs.getSSTables().isEmpty())
-            tasks.add(new CompactionTask(cfs, cfs.getSSTables(), gcBefore));
-        return tasks;
+        return cfs.getSSTables().isEmpty() ? null : new CompactionTask(cfs, cfs.getSSTables(),
gcBefore);
     }
 
     public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables,
final int gcBefore)
@@ -158,17 +178,13 @@ public class SizeTieredCompactionStrateg
         return new LinkedList<List<T>>(buckets.keySet());
     }
 
-    private void updateEstimatedCompactionsByTasks(List<AbstractCompactionTask> tasks)
+    private void updateEstimatedCompactionsByTasks(List<List<SSTableReader>>
tasks)
     {
         int n = 0;
-        for (AbstractCompactionTask task: tasks)
+        for (List<SSTableReader> bucket: tasks)
         {
-            if (!(task instanceof CompactionTask))
-                continue;
-
-            Collection<SSTableReader> sstablesToBeCompacted = task.getSSTables();
-            if (sstablesToBeCompacted.size() >= cfs.getMinimumCompactionThreshold())
-                n += Math.ceil((double)sstablesToBeCompacted.size() / cfs.getMaximumCompactionThreshold());
+            if (bucket.size() >= cfs.getMinimumCompactionThreshold())
+                n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
         }
         estimatedRemainingTasks = n;
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java?rev=1203729&r1=1203728&r2=1203729&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java Fri
Nov 18 16:37:39 2011
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class CompactionsTest extends CleanupHelper
 {
@@ -140,20 +141,16 @@ public class CompactionsTest extends Cle
         store.setMaximumCompactionThreshold(4);
 
         // loop submitting parallel compactions until they all return 0
-        while (true)
+        do
         {
-            ArrayList<Future<Integer>> compactions = new ArrayList<Future<Integer>>();
+            ArrayList<Future<?>> compactions = new ArrayList<Future<?>>();
             for (int i = 0; i < 10; i++)
                 compactions.add(CompactionManager.instance.submitBackground(store));
             // another compaction attempt will be launched in the background by
             // each completing compaction: not much we can do to control them here
-            boolean progress = false;
-            for (Future<Integer> compaction : compactions)
-               if (compaction.get() > 0)
-                   progress = true;
-            if (!progress)
-                break;
-        }
+            FBUtilities.waitOnFutures(compactions);
+        } while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions()
> 0);
+
         if (store.getSSTables().size() > 1)
         {
             CompactionManager.instance.performMaximal(store);



Mime
View raw message