cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [1/7] cassandra git commit: Extend Transactional API to sstable lifecycle management
Date Fri, 22 May 2015 08:46:32 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 15d424e86 -> d96a02a12


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e5a76bdb/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 5dca589..fa91d00 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -18,14 +18,16 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.BeforeClass;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -43,6 +45,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.db.compaction.SSTableSplitter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.metrics.StorageMetrics;
@@ -52,7 +55,6 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
 import static org.junit.Assert.*;
-import static org.apache.cassandra.utils.Throwables.maybeFail;
 
 public class SSTableRewriterTest extends SchemaLoader
 {
@@ -83,7 +85,9 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
+        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+
         for (int j = 0; j < 100; j ++)
         {
             ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
@@ -94,8 +98,10 @@ public class SSTableRewriterTest extends SchemaLoader
         cfs.forceBlockingFlush();
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
-             AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        assertEquals(sstables.iterator().next().bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.getCount());
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
+             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -105,30 +111,29 @@ public class SSTableRewriterTest extends SchemaLoader
                 AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
                 writer.append(row);
             }
-            Collection<SSTableReader> newsstables = writer.finish();
-            cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables , OperationType.COMPACTION);
+            writer.finish();
         }
         SSTableDeletingTask.waitForDeletions();
-
         validateCFS(cfs);
         int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(),
0, 0);
         assertEquals(1, filecounts);
-
+        truncate(cfs);
     }
     @Test
     public void basicTest2() throws InterruptedException
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
         Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
         assertEquals(1, sstables.size());
         SSTableRewriter.overrideOpenInterval(10000000);
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
-             AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
+             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -138,11 +143,9 @@ public class SSTableRewriterTest extends SchemaLoader
                 AbstractCompactedRow row = new LazilyCompactedRow(controller, Arrays.asList(scanner.next()));
                 writer.append(row);
             }
-            Collection<SSTableReader> newsstables = writer.finish();
-            cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
+            writer.finish();
         }
         SSTableDeletingTask.waitForDeletions();
-
         validateCFS(cfs);
         int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(),
0, 0);
         assertEquals(1, filecounts);
@@ -153,7 +156,9 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
+        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+        assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -161,8 +166,9 @@ public class SSTableRewriterTest extends SchemaLoader
         assertEquals(1, sstables.size());
         SSTableRewriter.overrideOpenInterval(10000000);
         boolean checked = false;
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
-             AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);)
+        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
+             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             ISSTableScanner scanner = scanners.scanners.get(0);
             CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
@@ -178,7 +184,7 @@ public class SSTableRewriterTest extends SchemaLoader
                     {
                         if (sstable.openReason == SSTableReader.OpenReason.EARLY)
                         {
-                            SSTableReader c = sstables.iterator().next();
+                            SSTableReader c = txn.current(sstables.iterator().next());
                             Collection<Range<Token>> r = Arrays.asList(new Range<>(cfs.partitioner.getMinimumToken(),
cfs.partitioner.getMinimumToken()));
                             List<Pair<Long, Long>> tmplinkPositions = sstable.getPositionsForRanges(r);
                             List<Pair<Long, Long>> compactingPositions = c.getPositionsForRanges(r);
@@ -193,17 +199,14 @@ public class SSTableRewriterTest extends SchemaLoader
                 }
             }
             assertTrue(checked);
-            Collection<SSTableReader> newsstables = writer.finish();
-            cfs.getDataTracker().markCompactedSSTablesReplaced(sstables, newsstables, OperationType.COMPACTION);
+            writer.finish();
         }
         SSTableDeletingTask.waitForDeletions();
-
         validateCFS(cfs);
         int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(),
0, 0);
         assertEquals(1, filecounts);
-        cfs.truncateBlocking();
+        truncate(cfs);
         SSTableDeletingTask.waitForDeletions();
-
         validateCFS(cfs);
     }
 
@@ -212,7 +215,8 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
+        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         for (int i = 0; i < 100; i++)
             cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
@@ -253,7 +257,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -263,9 +267,10 @@ public class SSTableRewriterTest extends SchemaLoader
 
         List<SSTableReader> sstables;
         int files = 1;
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-             ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
@@ -282,8 +287,6 @@ public class SSTableRewriterTest extends SchemaLoader
                 }
             }
             sstables = rewriter.finish();
-            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
-            assertEquals(files, sstables.size());
         }
         long sum = 0;
         for (SSTableReader x : cfs.getSSTables())
@@ -305,7 +308,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -315,9 +318,10 @@ public class SSTableRewriterTest extends SchemaLoader
 
         List<SSTableReader> sstables;
         int files = 1;
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-             ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
@@ -335,10 +339,6 @@ public class SSTableRewriterTest extends SchemaLoader
 
         assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
-        assertEquals(1, cfs.getDataTracker().getView().shadowed.size());
-        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
-        assertEquals(files, cfs.getSSTables().size());
-        assertEquals(0, cfs.getDataTracker().getView().shadowed.size());
         SSTableDeletingTask.waitForDeletions();
 
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
@@ -429,7 +429,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -440,9 +440,10 @@ public class SSTableRewriterTest extends SchemaLoader
         Set<SSTableReader> compacting = Sets.newHashSet(s);
         SSTableRewriter.overrideOpenInterval(10000000);
 
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-             ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             test.run(scanner, controller, s, cfs, rewriter);
@@ -463,7 +464,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -472,9 +473,10 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableRewriter.overrideOpenInterval(10000000);
 
         int files = 1;
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-             ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
@@ -489,8 +491,7 @@ public class SSTableRewriterTest extends SchemaLoader
                 if (files == 3)
                 {
                     //testing to finish when we have nothing written in the new file
-                    List<SSTableReader> sstables = rewriter.finish();
-                    cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables,
OperationType.COMPACTION);
+                    rewriter.finish();
                     break;
                 }
             }
@@ -508,7 +509,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         cfs.disableAutoCompaction();
 
         SSTableReader s = writeFile(cfs, 1000);
@@ -518,9 +519,10 @@ public class SSTableRewriterTest extends SchemaLoader
 
         List<SSTableReader> sstables;
         int files = 1;
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-             ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
@@ -535,7 +537,6 @@ public class SSTableRewriterTest extends SchemaLoader
             }
 
             sstables = rewriter.finish();
-            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         }
 
         SSTableDeletingTask.waitForDeletions();
@@ -550,7 +551,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         cfs.disableAutoCompaction();
 
         SSTableReader s = writeFile(cfs, 400);
@@ -560,9 +561,10 @@ public class SSTableRewriterTest extends SchemaLoader
 
         List<SSTableReader> sstables;
         int files = 1;
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-             ISSTableScanner scanner = s.getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = s.getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);)
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while(scanner.hasNext())
@@ -577,7 +579,6 @@ public class SSTableRewriterTest extends SchemaLoader
             }
 
             sstables = rewriter.finish();
-            cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
         }
 
         assertEquals(files, sstables.size());
@@ -593,22 +594,24 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         cfs.disableAutoCompaction();
         SSTableReader s = writeFile(cfs, 1000);
-        cfs.getDataTracker().markCompacting(Arrays.asList(s), true, false);
-        SSTableSplitter splitter = new SSTableSplitter(cfs, s, 10);
-        splitter.split();
+        try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.UNKNOWN,
s))
+        {
+            SSTableSplitter splitter = new SSTableSplitter(cfs, txn, 10);
+            splitter.split();
 
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+            assertFileCounts(s.descriptor.directory.list(), 0, 0);
 
-        s.selfRef().release();
-        SSTableDeletingTask.waitForDeletions();
+            s.selfRef().release();
+            SSTableDeletingTask.waitForDeletions();
 
-        for (File f : s.descriptor.directory.listFiles())
-        {
-            // we need to clear out the data dir, otherwise tests running after this breaks
-            FileUtils.deleteRecursive(f);
+            for (File f : s.descriptor.directory.listFiles())
+            {
+                // we need to clear out the data dir, otherwise tests running after this
breaks
+                FileUtils.deleteRecursive(f);
+            }
         }
     }
 
@@ -639,17 +642,18 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         SSTableReader s = writeFile(cfs, 1000);
         if (!offline)
             cfs.addSSTable(s);
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-        cfs.getDataTracker().markCompacting(compacting);
         SSTableRewriter.overrideOpenInterval(10000000);
-
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, offline);
-             ISSTableScanner scanner = compacting.iterator().next().getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = offline ? LifecycleTransaction.offline(OperationType.UNKNOWN,
compacting)
+                                       : cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, offline);
+        )
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while (scanner.hasNext())
@@ -672,7 +676,6 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         finally
         {
-            cfs.getDataTracker().unmarkCompacting(compacting);
             if (offline)
                 s.selfRef().release();
         }
@@ -686,9 +689,8 @@ public class SSTableRewriterTest extends SchemaLoader
             assertEquals(1, cfs.getSSTables().size());
             validateCFS(cfs);
         }
-        cfs.truncateBlocking();
+        truncate(cfs);
         SSTableDeletingTask.waitForDeletions();
-
         filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
         if (offline)
         {
@@ -709,7 +711,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         for (int i = 0; i < 100; i++)
         {
             DecoratedKey key = Util.dk(Integer.toString(i));
@@ -726,14 +728,14 @@ public class SSTableRewriterTest extends SchemaLoader
         SSTableReader s = cfs.getSSTables().iterator().next();
         Set<SSTableReader> compacting = new HashSet<>();
         compacting.add(s);
-        cfs.getDataTracker().markCompacting(compacting);
-
 
         SSTableRewriter.overrideOpenInterval(1);
         int keyCount = 0;
-        try (SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, false);
-             ISSTableScanner scanner = compacting.iterator().next().getScanner();
-             CompactionController controller = new CompactionController(cfs, compacting,
0))
+        try (ISSTableScanner scanner = compacting.iterator().next().getScanner();
+             CompactionController controller = new CompactionController(cfs, compacting,
0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
+             SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);
+        )
         {
             rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
             while (scanner.hasNext())
@@ -746,15 +748,7 @@ public class SSTableRewriterTest extends SchemaLoader
                 keyCount++;
                 validateKeys(keyspace);
             }
-            try
-            {
-                cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, rewriter.finish(),
OperationType.COMPACTION);
-                cfs.getDataTracker().unmarkCompacting(compacting);
-            }
-            catch (Throwable t)
-            {
-                rewriter.abort();
-            }
+            rewriter.finish();
         }
         validateKeys(keyspace);
         SSTableDeletingTask.waitForDeletions();
@@ -762,7 +756,7 @@ public class SSTableRewriterTest extends SchemaLoader
     }
 
     @Test
-    public void testCanonicalView() throws IOException
+    public void testCanonicalView() throws Exception
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
@@ -770,15 +764,16 @@ public class SSTableRewriterTest extends SchemaLoader
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
-        Set<SSTableReader> sstables = Sets.newHashSet(cfs.markAllCompacting());
+        Set<SSTableReader> sstables = Sets.newHashSet(s);
         assertEquals(1, sstables.size());
         SSTableRewriter.overrideOpenInterval(10000000);
-        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, false);
         boolean checked = false;
-        try (AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategy().getScanners(sstables))
+        try (ISSTableScanner scanner = sstables.iterator().next().getScanner();
+             CompactionController controller = new CompactionController(cfs, sstables, 0);
+             LifecycleTransaction txn = cfs.getTracker().tryModify(sstables, OperationType.UNKNOWN);
+             SSTableRewriter writer = new SSTableRewriter(cfs, txn, 1000, false);
+        )
         {
-            ISSTableScanner scanner = scanners.scanners.get(0);
-            CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(System.currentTimeMillis()));
             writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
             while (scanner.hasNext())
             {
@@ -796,10 +791,7 @@ public class SSTableRewriterTest extends SchemaLoader
                 }
             }
         }
-        writer.abort();
-        cfs.getDataTracker().unmarkCompacting(sstables);
-        cfs.truncateBlocking();
-        SSTableDeletingTask.waitForDeletions();
+        truncateCF();
         validateCFS(cfs);
     }
 
@@ -813,30 +805,52 @@ public class SSTableRewriterTest extends SchemaLoader
         }
     }
 
-    private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+    public static void truncate(ColumnFamilyStore cfs)
     {
-        ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
-        for (int i = 0; i < count / 100; i++)
-            cf.addColumn(Util.cellname(i), random(0, 1000), 1);
-        File dir = cfs.directories.getDirectoryForNewSSTables();
-        String filename = cfs.getTempSSTablePath(dir);
+        cfs.truncateBlocking();
+        Uninterruptibles.sleepUninterruptibly(10L,TimeUnit.MILLISECONDS);
+        assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+        assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
+    }
 
-        try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0);)
+    public static SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+    {
+        return Iterables.getFirst(writeFiles(cfs, 1, count * 5, count / 100, 1000), null);
+    }
+
+    public static Set<SSTableReader> writeFiles(ColumnFamilyStore cfs, int fileCount,
int partitionCount, int cellCount, int cellSize)
+    {
+        int i = 0;
+        Set<SSTableReader> result = new LinkedHashSet<>();
+        for (int f = 0 ; f < fileCount ; f++)
         {
-            for (int i = 0; i < count * 5; i++)
+            File dir = cfs.directories.getDirectoryForNewSSTables();
+            String filename = cfs.getTempSSTablePath(dir);
+
+            SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
+            int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) /
fileCount;
+            for ( ; i < end ; i++)
+            {
+                ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+                for (int j = 0; j < cellCount ; j++)
+                    cf.addColumn(Util.cellname(j), random(0, cellSize), 1);
                 writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
cf);
-            return writer.finish(true);
+            }
+            result.add(writer.finish(true));
         }
+        return result;
     }
 
-    private void validateCFS(ColumnFamilyStore cfs)
+    public static void validateCFS(ColumnFamilyStore cfs)
     {
         Set<Integer> liveDescriptors = new HashSet<>();
+        long spaceUsed = 0;
         for (SSTableReader sstable : cfs.getSSTables())
         {
             assertFalse(sstable.isMarkedCompacted());
             assertEquals(1, sstable.selfRef().globalCount());
             liveDescriptors.add(sstable.descriptor.generation);
+            spaceUsed += sstable.bytesOnDisk();
         }
         for (File dir : cfs.directories.getCFDirectories())
         {
@@ -849,11 +863,13 @@ public class SSTableRewriterTest extends SchemaLoader
                 }
             }
         }
-        assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+        assertEquals(spaceUsed, cfs.metric.liveDiskSpaceUsed.getCount());
+        assertEquals(spaceUsed, cfs.metric.totalDiskSpaceUsed.getCount());
+        assertTrue(cfs.getTracker().getCompacting().isEmpty());
     }
 
 
-    private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+    public static int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
     {
         int tmplinkcount = 0;
         int tmpcount = 0;
@@ -874,7 +890,7 @@ public class SSTableRewriterTest extends SchemaLoader
         return datacount;
     }
 
-    private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
     {
         String filename = cfs.getTempSSTablePath(directory);
         return SSTableWriter.create(filename, 0, 0);


Mime
View raw message