cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject git commit: Anticompact sstables as groups
Date Wed, 20 Aug 2014 11:28:26 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 060c7961d -> 37f517593


Anticompact sstables as groups

Patch by Russell Spitzer; reviewed by marcuse for CASSANDRA-6851


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37f51759
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37f51759
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37f51759

Branch: refs/heads/trunk
Commit: 37f5175935a37ce2c005335c2f486efb827b6eba
Parents: 060c796
Author: Russell Spitzer <Russell.Spitzer@gmail.com>
Authored: Wed Aug 20 13:27:52 2014 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Wed Aug 20 13:27:52 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../compaction/AbstractCompactionStrategy.java  |  31 ++++
 .../db/compaction/CompactionManager.java        | 174 +++++++++++++------
 .../compaction/LeveledCompactionStrategy.java   |  57 +++++-
 .../db/compaction/AntiCompactionTest.java       |  96 +++++++++-
 .../LeveledCompactionStrategyTest.java          |  61 ++++++-
 6 files changed, 358 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d1dedbf..80ddddc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Do anticompaction in groups (CASSANDRA-6851)
  * Verify that UDF class methods are static (CASSANDRA-7781)
  * Support pure user-defined functions (CASSANDRA-7395)
  * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 1b7786e..28ab84e 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -399,4 +399,35 @@ public abstract class AbstractCompactionStrategy
 
         return optionValue == null || Boolean.parseBoolean(optionValue);
     }
+
+
+    /**
+     * Method for grouping similar SSTables together, This will be used by
+     * anti-compaction to determine which SSTables should be anitcompacted
+     * as a group. If a given compaction strategy creates sstables which
+     * cannot be merged due to some constraint it must override this method.
+     */
+    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader>
sstablesToGroup)
+    {
+        int groupSize = 2;
+        List<SSTableReader> sortedSSTablesToGroup = new ArrayList<>(sstablesToGroup);
+        Collections.sort(sortedSSTablesToGroup, SSTableReader.sstableComparator);
+
+        Collection<Collection<SSTableReader>> groupedSSTables = new ArrayList<>();
+        Collection<SSTableReader> currGroup = new ArrayList<>();
+
+        for (SSTableReader sstable : sortedSSTablesToGroup)
+        {
+            currGroup.add(sstable);
+            if (currGroup.size() == groupSize)
+            {
+                groupedSSTables.add(currGroup);
+                currGroup = new ArrayList<>();
+            }
+        }
+
+        if (currGroup.size() != 0)
+            groupedSSTables.add(currGroup);
+        return groupedSSTables;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e578ddf..5af7139 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -52,6 +52,7 @@ import com.google.common.collect.Multimap;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -397,7 +398,7 @@ public class CompactionManager implements CompactionManagerMBean
                                       Collection<SSTableReader> validatedForRepair,
                                       long repairedAt) throws InterruptedException, ExecutionException,
IOException
     {
-        logger.info("Starting anticompaction");
+        logger.info("Starting anticompaction for {}/{}", cfs.keyspace.getName(), cfs.getColumnFamilyName());
         logger.debug("Starting anticompaction for ranges {}", ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
@@ -847,6 +848,37 @@ public class CompactionManager implements CompactionManagerMBean
                                  new MetadataCollector(Collections.singleton(sstable), cfs.metadata.comparator,
sstable.getSSTableLevel()));
     }
 
+    public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs,
+                                             File compactionFileLocation,
+                                             int expectedBloomFilterSize,
+                                             long repairedAt,
+                                             Collection<SSTableReader> sstables)
+    {
+        FileUtils.createDirectory(compactionFileLocation);
+        int minLevel = Integer.MAX_VALUE;
+        // if all sstables have the same level, we can compact them together without creating
overlap during anticompaction
+        // note that we only anticompact from unrepaired sstables, which is not leveled,
but we still keep original level
+        // after first migration to be able to drop the sstables back in their original place
in the repaired sstable manifest
+        for (SSTableReader sstable : sstables)
+        {
+            if (minLevel == Integer.MAX_VALUE)
+                minLevel = sstable.getSSTableLevel();
+
+            if (minLevel != sstable.getSSTableLevel())
+            {
+                minLevel = 0;
+                break;
+            }
+        }
+        return new SSTableWriter(cfs.getTempSSTablePath(compactionFileLocation),
+                                 expectedBloomFilterSize,
+                                 repairedAt,
+                                 cfs.metadata,
+                                 cfs.partitioner,
+                                 new MetadataCollector(sstables, cfs.metadata.comparator,
minLevel));
+    }
+
+
     /**
      * Performs a readonly "compaction" of all sstables in order to validate complete rows,
      * but without writing the merge result
@@ -947,6 +979,8 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+
+
     /**
      * Splits up an sstable into two new sstables. The first of the new tables will store
repaired ranges, the second
      * will store the non-repaired ranges. Once anticompation is completed, the original
sstable is marked as compacted
@@ -956,83 +990,111 @@ public class CompactionManager implements CompactionManagerMBean
      * @param ranges Repaired ranges to be placed into one of the new sstables. The repaired
table will be tracked via
      * the {@link org.apache.cassandra.io.sstable.metadata.StatsMetadata#repairedAt} field.
      */
-    private Collection<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>>
ranges, Collection<SSTableReader> repairedSSTables, long repairedAt)
+    private void doAntiCompaction(ColumnFamilyStore cfs, Collection<Range<Token>>
ranges,
+                                                       Collection<SSTableReader> repairedSSTables,
long repairedAt)
     {
-        List<SSTableReader> anticompactedSSTables = new ArrayList<>();
-        int repairedKeyCount = 0;
-        int unrepairedKeyCount = 0;
         // TODO(5351): we can do better here:
-        int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(repairedSSTables)));
         logger.info("Performing anticompaction on {} sstables", repairedSSTables.size());
+
+        //Group SSTables
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(repairedSSTables);
         // iterate over sstables to check if the repaired / unrepaired ranges intersect them.
-        for (SSTableReader sstable : repairedSSTables)
+        int antiCompactedSSTableCount = 0;
+        for (Collection<SSTableReader> sstableGroup : groupedSSTables)
         {
-            // check that compaction hasn't stolen any sstables used in previous repair sessions
-            // if we need to skip the anticompaction, it will be carried out by the next
repair
+            int antiCompacted = antiCompactGroup(cfs, ranges, sstableGroup, repairedAt);
+            antiCompactedSSTableCount += antiCompacted;
+        }
+
+        String format = "Anticompaction completed successfully, anticompacted from {} to
{} sstable(s).";
+        logger.info(format, repairedSSTables.size(), antiCompactedSSTableCount);
+    }
+
+    private int antiCompactGroup(ColumnFamilyStore cfs, Collection<Range<Token>>
ranges,
+                             Collection<SSTableReader> anticompactionGroup, long repairedAt)
+    {
+        long groupMaxDataAge = -1;
+
+        // check that compaction hasn't stolen any sstables used in previous repair sessions
+        // if we need to skip the anticompaction, it will be carried out by the next repair
+        for (Iterator<SSTableReader> i = anticompactionGroup.iterator(); i.hasNext();)
+        {
+            SSTableReader sstable = i.next();
             if (!new File(sstable.getFilename()).exists())
             {
                 logger.info("Skipping anticompaction for {}, required sstable was compacted
and is no longer available.", sstable);
+                i.remove();
                 continue;
             }
+            if (groupMaxDataAge < sstable.maxDataAge)
+                groupMaxDataAge = sstable.maxDataAge;
+        }
+
+        if (anticompactionGroup.size() == 0)
+        {
+            logger.info("No valid anticompactions for this group, All sstables were compacted
and are no longer available");
+            return 0;
+        }
 
-            logger.info("Anticompacting {}", sstable);
-            Set<SSTableReader> sstableAsSet = new HashSet<>();
-            sstableAsSet.add(sstable);
+        logger.info("Anticompacting {}", anticompactionGroup);
+        Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
 
-            File destination = cfs.directories.getDirectoryForNewSSTables();
-            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
-            SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
+        File destination = cfs.directories.getDirectoryForNewSSTables();
+        SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge,
OperationType.ANTICOMPACTION, false);
+        SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet,
groupMaxDataAge, OperationType.ANTICOMPACTION, false);
 
-            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-            List<ICompactionScanner> scanners = strategy.getScanners(Arrays.asList(sstable));
+        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
+        List<ICompactionScanner> scanners = strategy.getScanners(anticompactionGroup);
 
-            try (CompactionController controller = new CompactionController(cfs, new HashSet<>(Collections.singleton(sstable)),
CFMetaData.DEFAULT_GC_GRACE_SECONDS))
-            {
-                repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, destination,
expectedBloomFilterSize, repairedAt, sstable));
-                unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstable));
+        int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
 
-                CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION,
scanners, controller);
+        long repairedKeyCount = 0;
+        long unrepairedKeyCount = 0;
+        try (CompactionController controller = new CompactionController(cfs, sstableAsSet,
CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+        {
+            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
 
-                try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION,
scanners, controller);
+
+            try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
+            {
+                while(iter.hasNext())
                 {
-                    while(iter.hasNext())
+                    AbstractCompactedRow row = iter.next();
+                    // if current range from sstable is repaired, save it into the new repaired
sstable
+                    if (Range.isInRanges(row.key.getToken(), ranges))
+                    {
+                        repairedSSTableWriter.append(row);
+                        repairedKeyCount++;
+                    }
+                    // otherwise save into the new 'non-repaired' table
+                    else
                     {
-                        AbstractCompactedRow row = iter.next();
-                        // if current range from sstable is repaired, save it into the new
repaired sstable
-                        if (Range.isInRanges(row.key.getToken(), ranges))
-                        {
-                            repairedSSTableWriter.append(row);
-                            repairedKeyCount++;
-                        }
-                        // otherwise save into the new 'non-repaired' table
-                        else
-                        {
-                            unRepairedSSTableWriter.append(row);
-                            unrepairedKeyCount++;
-                        }
+                        unRepairedSSTableWriter.append(row);
+                        unrepairedKeyCount++;
                     }
                 }
-                // we have the same readers being rewritten by both writers, so we ask the
first one NOT to close them
-                // so that the second one can do so safely, without leaving us with references
< 0 or any other ugliness
-                repairedSSTableWriter.finish(false, repairedAt);
-                unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
-                // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
-                anticompactedSSTables.addAll(repairedSSTableWriter.finished());
-                anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
-            }
-            catch (Throwable e)
-            {
-                logger.error("Error anticompacting " + sstable, e);
-                repairedSSTableWriter.abort();
-                unRepairedSSTableWriter.abort();
             }
+            // we have the same readers being rewritten by both writers, so we ask the first
one NOT to close them
+            // so that the second one can do so safely, without leaving us with references
< 0 or any other ugliness
+            repairedSSTableWriter.finish(false, repairedAt);
+            unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
+            // add repaired table with a non-null timestamp field to be saved in SSTableMetadata#repairedAt
+            logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
+                                                                       repairedKeyCount +
unrepairedKeyCount,
+                                                                       cfs.keyspace.getName(),
+                                                                       cfs.getColumnFamilyName(),
+                                                                       anticompactionGroup);
+            return repairedSSTableWriter.finished().size() + unRepairedSSTableWriter.finished().size();
         }
-        String format = "Repaired {} keys of {} for {}/{}";
-        logger.debug(format, repairedKeyCount, (repairedKeyCount + unrepairedKeyCount), cfs.keyspace,
cfs.getColumnFamilyName());
-        String format2 = "Anticompaction completed successfully, anticompacted from {} to
{} sstable(s).";
-        logger.info(format2, repairedSSTables.size(), anticompactedSSTables.size());
-
-        return anticompactedSSTables;
+        catch (Throwable e)
+        {
+            logger.error("Error anticompacting " + anticompactionGroup, e);
+            repairedSSTableWriter.abort();
+            unRepairedSSTableWriter.abort();
+        }
+        return 0;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 3ee59ad..b179b3a 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -18,7 +18,18 @@
 package org.apache.cassandra.db.compaction;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -170,6 +181,50 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
         return new LeveledCompactionTask(cfs, sstables, level, gcBefore, maxSSTableBytes);
     }
 
+    /**
+     * Leveled compaction strategy has guarantees on the data contained within each level
so we
+     * have to make sure we only create groups of SSTables with members from the same level.
+     * This way we won't end up creating invalid sstables during anti-compaction.
+     * @param ssTablesToGroup
+     * @return Groups of sstables from the same level
+     */
+    @Override
+    public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader>
ssTablesToGroup)
+    {
+        int groupSize = 2;
+        Map<Integer, Collection<SSTableReader>> sstablesByLevel = new HashMap<>();
+        for (SSTableReader sstable : ssTablesToGroup)
+        {
+            Integer level = sstable.getSSTableLevel();
+            if (!sstablesByLevel.containsKey(level))
+            {
+                sstablesByLevel.put(level, new ArrayList<SSTableReader>());
+            }
+            sstablesByLevel.get(level).add(sstable);
+        }
+
+        Collection<Collection<SSTableReader>> groupedSSTables = new ArrayList<>();
+
+        for (Collection<SSTableReader> levelOfSSTables : sstablesByLevel.values())
+        {
+            Collection<SSTableReader> currGroup = new ArrayList<>();
+            for (SSTableReader sstable : levelOfSSTables)
+            {
+                currGroup.add(sstable);
+                if (currGroup.size() == groupSize)
+                {
+                    groupedSSTables.add(currGroup);
+                    currGroup = new ArrayList<>();
+                }
+            }
+
+            if (currGroup.size() != 0)
+                groupedSSTables.add(currGroup);
+        }
+        return groupedSSTables;
+
+    }
+
     public int getEstimatedRemainingTasks()
     {
         return manifest.getEstimatedTasks();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index e47f0e9..f632a65 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.junit.BeforeClass;
+import org.junit.After;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -41,6 +42,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import static junit.framework.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
@@ -51,14 +53,23 @@ public class AntiCompactionTest
     private static final String KEYSPACE1 = "AntiCompactionTest";
     private static final String CF = "Standard1";
 
+
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
     {
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
-                                    SimpleStrategy.class,
-                                    KSMetaData.optsWithRF(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF));
+                SimpleStrategy.class,
+                KSMetaData.optsWithRF(1),
+                SchemaLoader.standardCFMD(KEYSPACE1, CF));
+    }
+
+    @After
+    public void truncateCF()
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.truncateBlocking();
     }
 
     @Test
@@ -113,4 +124,83 @@ public class AntiCompactionTest
         assertEquals(repairedKeys, 4);
         assertEquals(nonRepairedKeys, 6);
     }
+
+
+    public void generateSStable(ColumnFamilyStore store, String Suffix)
+    {
+    long timestamp = System.currentTimeMillis();
+    for (int i = 0; i < 10; i++)
+        {
+            DecoratedKey key = Util.dk(Integer.toString(i) + "-" + Suffix);
+            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+            for (int j = 0; j < 10; j++)
+                rm.add("Standard1", Util.cellname(Integer.toString(j)),
+                        ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                        timestamp,
+                        0);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+    }
+
+    @Test
+    public void antiCompactTenSTC() throws InterruptedException, ExecutionException, IOException{
+        antiCompactTen("SizeTieredCompactionStrategy");
+    }
+
+    @Test
+    public void antiCompactTenLC() throws InterruptedException, ExecutionException, IOException{
+        antiCompactTen("LeveledCompactionStrategy");
+    }
+
+    public void antiCompactTen(String compactionStrategy) throws InterruptedException, ExecutionException,
IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE1);
+        ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
+        store.setCompactionStrategyClass(compactionStrategy);
+        store.disableAutoCompaction();
+
+        for (int table = 0; table < 10; table++)
+        {
+            generateSStable(store,Integer.toString(table));
+        }
+        Collection<SSTableReader> sstables = store.getUnrepairedSSTables();
+        assertEquals(store.getSSTables().size(), sstables.size());
+
+        Range<Token> range = new Range<Token>(new BytesToken("0".getBytes()),
new BytesToken("4".getBytes()));
+        List<Range<Token>> ranges = Arrays.asList(range);
+
+        SSTableReader.acquireReferences(sstables);
+        long repairedAt = 1000;
+        CompactionManager.instance.performAnticompaction(store, ranges, sstables, repairedAt);
+        /*
+        Anticompaction will be anti-compacting 10 SSTables but will be doing this two at
a time
+        so there will be no net change in the number of sstables
+         */
+        assertEquals(10, store.getSSTables().size());
+        int repairedKeys = 0;
+        int nonRepairedKeys = 0;
+        for (SSTableReader sstable : store.getSSTables())
+        {
+            SSTableScanner scanner = sstable.getScanner();
+            while (scanner.hasNext())
+            {
+                SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                if (sstable.isRepaired())
+                {
+                    assertTrue(range.contains(row.getKey().getToken()));
+                    assertEquals(repairedAt, sstable.getSSTableMetadata().repairedAt);
+                    repairedKeys++;
+                }
+                else
+                {
+                    assertFalse(range.contains(row.getKey().getToken()));
+                    assertEquals(ActiveRepairService.UNREPAIRED_SSTABLE, sstable.getSSTableMetadata().repairedAt);
+                    nonRepairedKeys++;
+                }
+            }
+        }
+        assertEquals(repairedKeys, 40);
+        assertEquals(nonRepairedKeys, 60);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37f51759/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 5f9b72b..7eec449 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -18,7 +18,13 @@
 package org.apache.cassandra.db.compaction;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 
 import org.junit.After;
 import org.junit.Before;
@@ -30,7 +36,10 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
@@ -84,6 +93,54 @@ public class LeveledCompactionStrategyTest
         cfs.truncateBlocking();
     }
 
+    /**
+     * Ensure that the grouping operation preserves the levels of grouped tables
+     */
+    @Test
+    public void testGrouperLevels() throws Exception{
+        ByteBuffer value = ByteBuffer.wrap(new byte[100 * 1024]); // 100 KB value, make it
easy to have multiple files
+
+        // Enough data to have a level 1 and 2
+        int rows = 20;
+        int columns = 10;
+
+        // Adds enough data to trigger multiple sstable per level
+        for (int r = 0; r < rows; r++)
+        {
+            DecoratedKey key = Util.dk(String.valueOf(r));
+            Mutation rm = new Mutation(KEYSPACE1, key.getKey());
+            for (int c = 0; c < columns; c++)
+            {
+                rm.add(CF_STANDARDDLEVELED, Util.cellname("column" + c), value, 0);
+            }
+            rm.apply();
+            cfs.forceBlockingFlush();
+        }
+
+        waitForLeveling(cfs);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) cfs.getCompactionStrategy();
+        // Checking we're not completely bad at math
+        assert strategy.getLevelSize(1) > 0;
+        assert strategy.getLevelSize(2) > 0;
+
+        Collection<Collection<SSTableReader>> groupedSSTables = cfs.getCompactionStrategy().groupSSTablesForAntiCompaction(cfs.getSSTables());
+        for (Collection<SSTableReader> sstableGroup : groupedSSTables)
+        {
+            int groupLevel = -1;
+            Iterator<SSTableReader> it = sstableGroup.iterator();
+            while (it.hasNext())
+            {
+
+                SSTableReader sstable = it.next();
+                int tableLevel = sstable.getSSTableLevel();
+                if (groupLevel == -1)
+                    groupLevel = tableLevel;
+                assert groupLevel == tableLevel;
+            }
+        }
+
+    }
+
     /*
      * This exercises in particular the code of #4142
      */


Mime
View raw message