cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [3/3] git commit: Merge branch 'cassandra-2.1' into trunk
Date Wed, 05 Nov 2014 08:50:50 GMT
Merge branch 'cassandra-2.1' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/compaction/CompactionManager.java
	src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
	src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
	src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
	test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
	test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/af44d1a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/af44d1a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/af44d1a7

Branch: refs/heads/trunk
Commit: af44d1a7c3baad2b5ce006b9c0f5249c38cb0504
Parents: e369ff6 e16f584
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Wed Nov 5 09:47:48 2014 +0100
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Wed Nov 5 09:47:48 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/CFMetaData.java |   3 +
 .../cassandra/db/CollationController.java       |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |  57 +---
 .../compaction/AbstractCompactionStrategy.java  |  14 +
 .../db/compaction/CompactionManager.java        |   2 +-
 .../DateTieredCompactionStrategy.java           |  46 +--
 .../compaction/LeveledCompactionStrategy.java   |  60 +---
 .../db/compaction/LeveledManifest.java          | 111 ++-----
 .../SizeTieredCompactionStrategy.java           |  63 ++--
 .../compaction/WrappingCompactionStrategy.java  | 331 +++++++++++++++++++
 .../io/sstable/format/SSTableReader.java        |   3 +-
 .../db/compaction/CompactionsTest.java          |   4 +-
 .../LeveledCompactionStrategyTest.java          |  88 +++--
 14 files changed, 486 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 817cbcf,80f4c8f..6ba76f9
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,37 -1,5 +1,38 @@@
 +3.0
 + * Mark sstables as repaired after full repair (CASSANDRA-7586) 
 + * Extend Descriptor to include a format value and refactor reader/writer apis (CASSANDRA-7443)
 + * Integrate JMH for microbenchmarks (CASSANDRA-8151)
 + * Keep sstable levels when bootstrapping (CASSANDRA-7460)
 + * Add Sigar library and perform basic OS settings check on startup (CASSANDRA-7838)
 + * Support for aggregation functions (CASSANDRA-4914)
 + * Remove cassandra-cli (CASSANDRA-7920)
 + * Accept dollar quoted strings in CQL (CASSANDRA-7769)
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Support pure user-defined functions (CASSANDRA-7395, 7526, 7562, 7740, 7781, 7929,
 +   7924, 7812, 8063)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 + * Fail on very large batch sizes (CASSANDRA-8011)
 + * improve concurrency of repair (CASSANDRA-6455)
 +
 +
  2.1.2
+  * Improve compaction of repaired/unrepaired sstables (CASSANDRA-8004)
   * Make cache serializers pluggable (CASSANDRA-8096)
   * Fix issues with CONTAINS (KEY) queries on secondary indexes
     (CASSANDRA-8147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 7a1f883,a560234..3ac16ab
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@@ -44,13 -32,9 +44,8 @@@ import org.apache.cassandra.db.columnit
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
- import org.apache.cassandra.notifications.INotification;
- import org.apache.cassandra.notifications.INotificationConsumer;
- import org.apache.cassandra.notifications.SSTableAddedNotification;
- import org.apache.cassandra.notifications.SSTableListChangedNotification;
- import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
 -import org.apache.cassandra.io.sstable.SSTableReader;
  
- public class LeveledCompactionStrategy extends AbstractCompactionStrategy implements INotificationConsumer
+ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
  {
      private static final Logger logger = LoggerFactory.getLogger(LeveledCompactionStrategy.class);
      private static final String SSTABLE_SIZE_OPTION = "sstable_size_in_mb";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 74be143,4b26d23..a4e420f
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@@ -320,31 -272,6 +272,18 @@@ public class LeveledManifes
       */
      public synchronized CompactionCandidate getCompactionCandidates()
      {
-         // if we don't have any repaired data, continue as usual
-         if (hasRepairedData)
-         {
-             Collection<SSTableReader> unrepairedMostInterresting = getSSTablesForSTCS(unrepairedL0);
-             if (!unrepairedMostInterresting.isEmpty())
-             {
-                 logger.info("Unrepaired data is most interresting, compacting {} sstables
with STCS", unrepairedMostInterresting.size());
-                 for (SSTableReader reader : unrepairedMostInterresting)
-                     assert !reader.isRepaired();
-                 return new CompactionCandidate(unrepairedMostInterresting, 0, Long.MAX_VALUE);
-             }
-         }
- 
 +        // during bootstrap we only do size tiering in L0 to make sure
 +        // the streamed files can be placed in their original levels
 +        if (StorageService.instance.isBootstrapMode())
 +        {
 +            List<SSTableReader> mostInteresting = getSSTablesForSTCS(getLevel(0));
 +            if (!mostInteresting.isEmpty())
 +            {
 +                logger.info("Bootstrapping - doing STCS in L0");
 +                return new CompactionCandidate(mostInteresting, 0, Long.MAX_VALUE);
 +            }
 +            return null;
 +        }
          // LevelDB gives each level a score of how much data it contains vs its ideal amount,
and
          // compacts the level with the highest score. But this falls apart spectacularly
once you
          // get behind.  Consider this set of levels:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 0abb68d,b72737a..fb6b060
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@@ -24,8 -24,6 +24,7 @@@ import com.google.common.annotations.Vi
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Lists;
  import com.google.common.collect.Sets;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
- import org.apache.cassandra.io.sstable.format.big.BigTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/af44d1a7/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
index 0000000,1d713ef..32e63bb
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/WrappingCompactionStrategy.java
@@@ -1,0 -1,331 +1,331 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collection;
+ import java.util.List;
+ import java.util.concurrent.Callable;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.ColumnFamilyStore;
+ import org.apache.cassandra.dht.Range;
+ import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.io.sstable.SSTableReader;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.notifications.INotification;
+ import org.apache.cassandra.notifications.INotificationConsumer;
+ import org.apache.cassandra.notifications.SSTableAddedNotification;
+ import org.apache.cassandra.notifications.SSTableDeletingNotification;
+ import org.apache.cassandra.notifications.SSTableListChangedNotification;
+ import org.apache.cassandra.notifications.SSTableRepairStatusChanged;
+ 
+ public final class WrappingCompactionStrategy extends AbstractCompactionStrategy implements
INotificationConsumer
+ {
+     private static final Logger logger = LoggerFactory.getLogger(WrappingCompactionStrategy.class);
+     private volatile AbstractCompactionStrategy repaired;
+     private volatile AbstractCompactionStrategy unrepaired;
+     public WrappingCompactionStrategy(ColumnFamilyStore cfs)
+     {
+         super(cfs, cfs.metadata.compactionStrategyOptions);
+         reloadCompactionStrategy(cfs.metadata);
+         cfs.getDataTracker().subscribe(this);
+         logger.debug("{} subscribed to the data tracker.", this);
+     }
+ 
+     @Override
+     public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore)
+     {
+         if (!isEnabled())
+             return null;
+ 
+         if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks())
+         {
+             AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore);
+             if (repairedTask != null)
+                 return repairedTask;
+             return unrepaired.getNextBackgroundTask(gcBefore);
+         }
+         else
+         {
+             AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore);
+             if (unrepairedTask != null)
+                 return unrepairedTask;
+             return repaired.getNextBackgroundTask(gcBefore);
+         }
+ 
+     }
+ 
+     @Override
+     public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore)
+     {
+         // runWithCompactionsDisabled cancels active compactions and disables them, then
we are able
+         // to make the repaired/unrepaired strategies mark their own sstables as compacting.
Once the
+         // sstables are marked the compactions are re-enabled
+         return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>()
+         {
+             @Override
+             public Collection<AbstractCompactionTask> call() throws Exception
+             {
+                 synchronized (WrappingCompactionStrategy.this)
+                 {
+                     Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore);
+                     Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore);
+ 
+                     if (repairedTasks == null && unrepairedTasks == null)
+                         return null;
+ 
+                     if (repairedTasks == null)
+                         return unrepairedTasks;
+                     if (unrepairedTasks == null)
+                         return repairedTasks;
+ 
+                     List<AbstractCompactionTask> tasks = new ArrayList<>();
+                     tasks.addAll(repairedTasks);
+                     tasks.addAll(unrepairedTasks);
+                     return tasks;
+                 }
+             }
+         }, false);
+     }
+ 
+     @Override
+     public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader>
sstables, int gcBefore)
+     {
+         assert !sstables.isEmpty();
+         boolean userDefinedInRepaired = sstables.iterator().next().isRepaired();
+         for (SSTableReader sstable : sstables)
+         {
+             if (userDefinedInRepaired != sstable.isRepaired())
+             {
+                 logger.error("You can't mix repaired and unrepaired sstables in a user defined
compaction");
+                 return null;
+             }
+         }
+         if (userDefinedInRepaired)
+             return repaired.getUserDefinedTask(sstables, gcBefore);
+         else
+             return unrepaired.getUserDefinedTask(sstables, gcBefore);
+     }
+ 
+     @Override
+     public synchronized int getEstimatedRemainingTasks()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return repaired.getEstimatedRemainingTasks() + unrepaired.getEstimatedRemainingTasks();
+     }
+ 
+     @Override
+     public synchronized long getMaxSSTableBytes()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return unrepaired.getMaxSSTableBytes();
+     }
+ 
+     public synchronized void maybeReloadCompactionStrategy(CFMetaData metadata)
+     {
+         if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass)
+             && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass)
+             && repaired.options.equals(metadata.compactionStrategyOptions)
+             && unrepaired.options.equals(metadata.compactionStrategyOptions))
+             return;
+ 
+         reloadCompactionStrategy(metadata);
+     }
+ 
+     public synchronized void reloadCompactionStrategy(CFMetaData metadata)
+     {
+         if (repaired != null)
+             repaired.shutdown();
+         if (unrepaired != null)
+             unrepaired.shutdown();
+         repaired = metadata.createCompactionStrategyInstance(cfs);
+         unrepaired = metadata.createCompactionStrategyInstance(cfs);
+         startup();
+     }
+ 
+     public synchronized int getUnleveledSSTables()
+     {
+         if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired
instanceof LeveledCompactionStrategy)
+         {
+             return ((LeveledCompactionStrategy)repaired).getLevelSize(0) + ((LeveledCompactionStrategy)unrepaired).getLevelSize(0);
+         }
+         return 0;
+     }
+ 
+     public synchronized int[] getSSTableCountPerLevel()
+     {
+         if (this.repaired instanceof LeveledCompactionStrategy && this.unrepaired
instanceof LeveledCompactionStrategy)
+         {
+             int [] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize();
+             int [] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize();
+             return sumArrays(repairedCountPerLevel, unrepairedCountPerLevel);
+         }
+         return null;
+     }
+ 
+     public static int [] sumArrays(int[] a, int [] b)
+     {
+         int [] res = new int[Math.max(a.length, b.length)];
+         for (int i = 0; i < res.length; i++)
+         {
+             if (i < a.length && i < b.length)
+                 res[i] = a[i] + b[i];
+             else if (i < a.length)
+                 res[i] = a[i];
+             else
+                 res[i] = b[i];
+         }
+         return res;
+     }
+ 
+     @Override
+     public boolean shouldDefragment()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return repaired.shouldDefragment();
+     }
+ 
+     @Override
+     public String getName()
+     {
+         assert repaired.getClass().equals(unrepaired.getClass());
+         return repaired.getName();
+     }
+ 
+     @Override
+     public void addSSTable(SSTableReader added)
+     {
+         throw new UnsupportedOperationException("Can't add sstables to the wrapping compaction
strategy");
+     }
+ 
+     @Override
+     public void removeSSTable(SSTableReader sstable)
+     {
+         throw new UnsupportedOperationException("Can't remove sstables from the wrapping
compaction strategy");
+     }
+ 
+     public synchronized void handleNotification(INotification notification, Object sender)
+     {
+         if (notification instanceof SSTableAddedNotification)
+         {
+             SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification;
+             if (flushedNotification.added.isRepaired())
+                 repaired.addSSTable(flushedNotification.added);
+             else
+                 unrepaired.addSSTable(flushedNotification.added);
+         }
+         else if (notification instanceof SSTableListChangedNotification)
+         {
+             SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification)
notification;
+             for (SSTableReader sstable : listChangedNotification.removed)
+             {
+                 if (sstable.isRepaired())
+                     repaired.removeSSTable(sstable);
+                 else
+                     unrepaired.removeSSTable(sstable);
+             }
+             for (SSTableReader sstable : listChangedNotification.added)
+             {
+                 if (sstable.isRepaired())
+                     repaired.addSSTable(sstable);
+                 else
+                     unrepaired.addSSTable(sstable);
+             }
+         }
+         else if (notification instanceof SSTableRepairStatusChanged)
+         {
+             for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable)
+             {
+                 if (sstable.isRepaired())
+                 {
+                     unrepaired.removeSSTable(sstable);
+                     repaired.addSSTable(sstable);
+                 }
+                 else
+                 {
+                     repaired.removeSSTable(sstable);
+                     unrepaired.addSSTable(sstable);
+                 }
+             }
+         }
+         else if (notification instanceof SSTableDeletingNotification)
+         {
+             SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting;
+             if (sstable.isRepaired())
+                 repaired.removeSSTable(sstable);
+             else
+                 unrepaired.removeSSTable(sstable);
+         }
+     }
+ 
+     @Override
+     public List<SSTableReader> filterSSTablesForReads(List<SSTableReader> sstables)
+     {
+         // todo: union of filtered sstables or intersection?
+         return unrepaired.filterSSTablesForReads(repaired.filterSSTablesForReads(sstables));
+     }
+ 
+     @Override
+     public synchronized void startup()
+     {
+         super.startup();
+         for (SSTableReader sstable : cfs.getSSTables())
+         {
+             if (sstable.isRepaired())
+                 repaired.addSSTable(sstable);
+             else
+                 unrepaired.addSSTable(sstable);
+         }
+         repaired.startup();
+         unrepaired.startup();
+     }
+ 
+     @Override
+     public synchronized void shutdown()
+     {
+         super.shutdown();
+         repaired.shutdown();
+         unrepaired.shutdown();
+     }
+ 
+     @Override
+     public synchronized ScannerList getScanners(Collection<SSTableReader> sstables,
Range<Token> range)
+     {
+         List<SSTableReader> repairedSSTables = new ArrayList<>();
+         List<SSTableReader> unrepairedSSTables = new ArrayList<>();
+         for (SSTableReader sstable : sstables)
+             if (sstable.isRepaired())
+                 repairedSSTables.add(sstable);
+             else
+                 unrepairedSSTables.add(sstable);
+         ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range);
+         ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range);
+         List<ICompactionScanner> scanners = new ArrayList<>(repairedScanners.scanners.size()
+ unrepairedScanners.scanners.size());
+         scanners.addAll(repairedScanners.scanners);
+         scanners.addAll(unrepairedScanners.scanners);
+         return new ScannerList(scanners);
+     }
+ 
+     public List<AbstractCompactionStrategy> getWrappedStrategies()
+     {
+         return Arrays.asList(repaired, unrepaired);
+     }
+ }


Mime
View raw message