cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [1/5] cassandra git commit: Introduce safer durable sstable membership management (and simplify cleanup of compaction leftovers)
Date Fri, 24 Jul 2015 16:32:10 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e338d2fa8 -> b09e60f72


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index 60cac2b..860f1d1 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -100,7 +100,7 @@ public class LegacySSTableTest
     protected Descriptor getDescriptor(String ver)
     {
         File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator
+ KSNAME);
-        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY);
+        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, SSTableFormat.Type.LEGACY);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 5a7c074..782f7fd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.util.List;
 
 import com.google.common.io.Files;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -33,6 +35,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -40,11 +43,15 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.OutputHandler;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class SSTableLoaderTest
 {
     public static final String KEYSPACE1 = "SSTableLoaderTest";
-    public static final String CF_STANDARD = "Standard1";
+    public static final String CF_STANDARD1 = "Standard1";
+    public static final String CF_STANDARD2 = "Standard2";
+
+    private File tmpdir;
 
     @BeforeClass
     public static void defineSchema() throws Exception
@@ -52,57 +59,66 @@ public class SSTableLoaderTest
         SchemaLoader.prepareServer();
         SchemaLoader.createKeyspace(KEYSPACE1,
                                     KeyspaceParams.simple(1),
-                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
-        setup();
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD2));
+
+        StorageService.instance.initServer();
     }
 
-    public static void setup() throws Exception
+    @Before
+    public void setup() throws Exception
     {
-        StorageService.instance.initServer();
+        tmpdir = Files.createTempDir();
+    }
+
+    @After
+    public void cleanup()
+    {
+        FileUtils.deleteRecursive(tmpdir);
+    }
+
+    private static final class TestClient extends SSTableLoader.Client
+    {
+        private String keyspace;
+
+        public void init(String keyspace)
+        {
+            this.keyspace = keyspace;
+            for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
+                addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
+            setPartitioner(StorageService.getPartitioner());
+        }
+
+        public CFMetaData getTableMetadata(String tableName)
+        {
+            return Schema.instance.getCFMetaData(keyspace, tableName);
+        }
     }
 
     @Test
     public void testLoadingSSTable() throws Exception
     {
-        File tempdir = Files.createTempDir();
-        File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KEYSPACE1 +
File.separator + CF_STANDARD);
+        File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator
+ CF_STANDARD1);
         assert dataDir.mkdirs();
-        CFMetaData cfmeta = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD);
+        CFMetaData cfmeta = Schema.instance.getCFMetaData(KEYSPACE1, CF_STANDARD1);
 
         String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii,
PRIMARY KEY (key, name))";
         String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
-                                                           ;
+
         try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
                                                        .inDirectory(dataDir)
                                                        .withPartitioner(StorageService.getPartitioner())
-                                                       .forTable(String.format(schema, KEYSPACE1,
CF_STANDARD))
-                                                       .using(String.format(query, KEYSPACE1,
CF_STANDARD))
+                                                       .forTable(String.format(schema, KEYSPACE1,
CF_STANDARD1))
+                                                       .using(String.format(query, KEYSPACE1,
CF_STANDARD1))
                                                        .build())
         {
             writer.addRow("key1", "col1", "100");
         }
 
-        SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
-        {
-            private String keyspace;
-
-            public void init(String keyspace)
-            {
-                this.keyspace = keyspace;
-                for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
-                    addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
-                setPartitioner(StorageService.getPartitioner());
-            }
-
-            public CFMetaData getTableMetadata(String tableName)
-            {
-                return Schema.instance.getCFMetaData(keyspace, tableName);
-            }
-        }, new OutputHandler.SystemOutput(false, false));
-
+        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false,
false));
         loader.stream().get();
 
-        List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD)).build());
+        List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build());
 
         assertEquals(1, partitions.size());
         assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
@@ -110,4 +126,51 @@ public class SSTableLoaderTest
                                                                    .getCell(cfmeta.getColumnDefinition(ByteBufferUtil.bytes("val")))
                                                                    .value());
     }
+
+    @Test
+    public void testLoadingIncompleteSSTable() throws Exception
+    {
+        File dataDir = new File(tmpdir.getAbsolutePath() + File.separator + KEYSPACE1 + File.separator
+ CF_STANDARD2);
+        assert dataDir.mkdirs();
+
+        //make sure we have no tables...
+        assertTrue(dataDir.listFiles().length == 0);
+
+        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii,
PRIMARY KEY (key, name))";
+        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+
+        CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                  .inDirectory(dataDir)
+                                                  .withPartitioner(StorageService.getPartitioner())
+                                                  .forTable(String.format(schema, KEYSPACE1,
CF_STANDARD2))
+                                                  .using(String.format(query, KEYSPACE1,
CF_STANDARD2))
+                                                  .withBufferSizeInMB(1)
+                                                  .build();
+
+        for (int i = 0; i < 1000; i++) // make sure to write more than 1 MB
+        {
+            for (int j = 0; j < 100; j++)
+                writer.addRow(String.format("key%d", i), String.format("col%d", j), "100");
+        }
+
+        //make sure we have some tables...
+        assertTrue(dataDir.listFiles().length > 0);
+
+        //writer is still open so loader should not load anything
+        SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false,
false));
+        loader.stream().get();
+
+        List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
+
+        assertTrue(partitions.size() > 0 && partitions.size() < 1000);
+
+        // now we complete the write and the second loader should load the last sstable as
well
+        writer.close();
+
+        loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false,
false));
+        loader.stream().get();
+
+        partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
+        assertEquals(1000, partitions.size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 579f981..0e533c2 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.lifecycle.TransactionLogs;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
 import org.apache.cassandra.db.compaction.CompactionController;
@@ -106,7 +107,7 @@ public class SSTableRewriterTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
         store.truncateBlocking();
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
     }
 
     @Test
@@ -135,16 +136,16 @@ public class SSTableRewriterTest extends SchemaLoader
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners,
controller, nowInSec, UUIDGen.getTimeUUID()))
         {
-            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory,
txn));
             while(ci.hasNext())
             {
                 writer.append(ci.next());
             }
             writer.finish();
         }
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
         validateCFS(cfs);
-        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(),
0, 0);
+        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
         assertEquals(1, filecounts);
         truncate(cfs);
     }
@@ -167,16 +168,16 @@ public class SSTableRewriterTest extends SchemaLoader
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners,
controller, nowInSec, UUIDGen.getTimeUUID()))
         {
-            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory,
txn));
             while (ci.hasNext())
             {
                 writer.append(ci.next());
             }
             writer.finish();
         }
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
         validateCFS(cfs);
-        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(),
0, 0);
+        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
         assertEquals(1, filecounts);
     }
 
@@ -200,7 +201,7 @@ public class SSTableRewriterTest extends SchemaLoader
              CompactionController controller = new CompactionController(cfs, sstables, cfs.gcBefore(nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners,
controller, nowInSec, UUIDGen.getTimeUUID()))
         {
-            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory,
txn));
             while (ci.hasNext())
             {
                 UnfilteredRowIterator row = ci.next();
@@ -229,9 +230,9 @@ public class SSTableRewriterTest extends SchemaLoader
             assertTrue(checked);
             writer.finish();
         }
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
         validateCFS(cfs);
-        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(),
0, 0);
+        int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list());
         assertEquals(1, filecounts);
         truncate(cfs);
     }
@@ -244,7 +245,8 @@ public class SSTableRewriterTest extends SchemaLoader
         truncate(cfs);
 
         File dir = cfs.directories.getDirectoryForNewSSTables();
-        try (SSTableWriter writer = getWriter(cfs, dir))
+        LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE, cfs.metadata);
+        try (SSTableWriter writer = getWriter(cfs, dir, txn))
         {
             for (int i = 0; i < 10000; i++)
             {
@@ -256,7 +258,7 @@ public class SSTableRewriterTest extends SchemaLoader
 
             SSTableReader s = writer.setMaxDataAge(1000).openEarly();
             assert s != null;
-            assertFileCounts(dir.list(), 2, 2);
+            assertFileCounts(dir.list());
             for (int i = 10000; i < 20000; i++)
             {
                 UpdateBuilder builder = UpdateBuilder.create(cfs.metadata, random(i, 10)).withTimestamp(1);
@@ -266,20 +268,20 @@ public class SSTableRewriterTest extends SchemaLoader
             }
             SSTableReader s2 = writer.setMaxDataAge(1000).openEarly();
             assertTrue(s.last.compareTo(s2.last) < 0);
-            assertFileCounts(dir.list(), 2, 2);
-            s.markObsolete(cfs.getTracker());
+            assertFileCounts(dir.list());
             s.selfRef().release();
             s2.selfRef().release();
             // These checks don't work on Windows because the writer has the channel still
             // open till .abort() is called (via the builder)
             if (!FBUtilities.isWindows())
             {
-                SSTableDeletingTask.waitForDeletions();
-                assertFileCounts(dir.list(), 0, 2);
+                TransactionLogs.waitForDeletions();
+                assertFileCounts(dir.list());
             }
             writer.abort();
-            SSTableDeletingTask.waitForDeletions();
-            int datafiles = assertFileCounts(dir.list(), 0, 0);
+            txn.abort();
+            TransactionLogs.waitForDeletions();
+            int datafiles = assertFileCounts(dir.list());
             assertEquals(datafiles, 0);
             validateCFS(cfs);
         }
@@ -306,14 +308,14 @@ public class SSTableRewriterTest extends SchemaLoader
              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
 
             while(ci.hasNext())
             {
                 rewriter.append(ci.next());
                 if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
                     files++;
                     assertEquals(cfs.getSSTables().size(), files); // we have one original
file plus the ones we have switched out.
                     assertEquals(s.bytesOnDisk(), cfs.metric.liveDiskSpaceUsed.getCount());
@@ -323,6 +325,9 @@ public class SSTableRewriterTest extends SchemaLoader
             }
             sstables = rewriter.finish();
         }
+
+        TransactionLogs.waitForDeletions();
+
         long sum = 0;
         for (SSTableReader x : cfs.getSSTables())
             sum += x.bytesOnDisk();
@@ -330,11 +335,11 @@ public class SSTableRewriterTest extends SchemaLoader
         assertEquals(startStorageMetricsLoad - sBytesOnDisk + sum, StorageMetrics.load.getCount());
         assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
 
         // tmplink and tmp files should be gone:
         assertEquals(sum, cfs.metric.totalDiskSpaceUsed.getCount());
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertFileCounts(s.descriptor.directory.list());
         validateCFS(cfs);
     }
 
@@ -358,14 +363,14 @@ public class SSTableRewriterTest extends SchemaLoader
              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
 
             while(ci.hasNext())
             {
                 rewriter.append(ci.next());
                 if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
                     files++;
                     assertEquals(cfs.getSSTables().size(), files); // we have one original
file plus the ones we have switched out.
                 }
@@ -375,9 +380,9 @@ public class SSTableRewriterTest extends SchemaLoader
 
         assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
 
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertFileCounts(s.descriptor.directory.list());
         validateCFS(cfs);
     }
 
@@ -387,7 +392,12 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         testNumberOfFiles_abort(new RewriterTest()
         {
-            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader
sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            public void run(ISSTableScanner scanner,
+                            CompactionController controller,
+                            SSTableReader sstable,
+                            ColumnFamilyStore cfs,
+                            SSTableRewriter rewriter,
+                            LifecycleTransaction txn)
             {
                 try (CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION,
Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
                 {
@@ -397,7 +407,7 @@ public class SSTableRewriterTest extends SchemaLoader
                         rewriter.append(ci.next());
                         if (rewriter.currentWriter().getFilePointer() > 25000000)
                         {
-                            rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory,
txn));
                             files++;
                             assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
                         }
@@ -413,7 +423,12 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         testNumberOfFiles_abort(new RewriterTest()
         {
-            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader
sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            public void run(ISSTableScanner scanner,
+                            CompactionController controller,
+                            SSTableReader sstable,
+                            ColumnFamilyStore cfs,
+                            SSTableRewriter rewriter,
+                            LifecycleTransaction txn)
             {
                 try (CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION,
Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
                 {
@@ -423,7 +438,7 @@ public class SSTableRewriterTest extends SchemaLoader
                         rewriter.append(ci.next());
                         if (rewriter.currentWriter().getFilePointer() > 25000000)
                         {
-                            rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory,
txn));
                             files++;
                             assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
                         }
@@ -444,7 +459,12 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         testNumberOfFiles_abort(new RewriterTest()
         {
-            public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader
sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter)
+            public void run(ISSTableScanner scanner,
+                            CompactionController controller,
+                            SSTableReader sstable,
+                            ColumnFamilyStore cfs,
+                            SSTableRewriter rewriter,
+                            LifecycleTransaction txn)
             {
                 try(CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION,
Collections.singletonList(scanner), controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
                 {
@@ -454,7 +474,7 @@ public class SSTableRewriterTest extends SchemaLoader
                         rewriter.append(ci.next());
                         if (files == 1 && rewriter.currentWriter().getFilePointer()
> 10000000)
                         {
-                            rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory));
+                        rewriter.switchWriter(getWriter(cfs, sstable.descriptor.directory,
txn));
                             files++;
                             assertEquals(cfs.getSSTables().size(), files); // we have one
original file plus the ones we have switched out.
                         }
@@ -467,7 +487,12 @@ public class SSTableRewriterTest extends SchemaLoader
 
     private static interface RewriterTest
     {
-        public void run(ISSTableScanner scanner, CompactionController controller, SSTableReader
sstable, ColumnFamilyStore cfs, SSTableRewriter rewriter);
+        public void run(ISSTableScanner scanner,
+                        CompactionController controller,
+                        SSTableReader sstable,
+                        ColumnFamilyStore cfs,
+                        SSTableRewriter rewriter,
+                        LifecycleTransaction txn);
     }
 
     private void testNumberOfFiles_abort(RewriterTest test) throws Exception
@@ -483,21 +508,20 @@ public class SSTableRewriterTest extends SchemaLoader
         DecoratedKey origLast = s.last;
         long startSize = cfs.metric.liveDiskSpaceUsed.getCount();
         Set<SSTableReader> compacting = Sets.newHashSet(s);
-
         try (ISSTableScanner scanner = s.getScanner();
              CompactionController controller = new CompactionController(cfs, compacting,
0);
              LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000))
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
-            test.run(scanner, controller, s, cfs, rewriter);
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
+            test.run(scanner, controller, s, cfs, rewriter, txn);
         }
 
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
 
         assertEquals(startSize, cfs.metric.liveDiskSpaceUsed.getCount());
         assertEquals(1, cfs.getSSTables().size());
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertFileCounts(s.descriptor.directory.list());
         assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
         assertEquals(cfs.getSSTables().iterator().next().last, origLast);
         validateCFS(cfs);
@@ -522,13 +546,13 @@ public class SSTableRewriterTest extends SchemaLoader
              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
             while(ci.hasNext())
             {
                 rewriter.append(ci.next());
                 if (rewriter.currentWriter().getFilePointer() > 2500000)
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
                     files++;
                     assertEquals(cfs.getSSTables().size(), files); // we have one original
file plus the ones we have switched out.
                 }
@@ -541,10 +565,10 @@ public class SSTableRewriterTest extends SchemaLoader
             }
         }
 
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
 
         assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote anything to
the last file
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertFileCounts(s.descriptor.directory.list());
         validateCFS(cfs);
     }
 
@@ -568,13 +592,13 @@ public class SSTableRewriterTest extends SchemaLoader
              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 10000000);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
             while(ci.hasNext())
             {
                 rewriter.append(ci.next());
                 if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
                     files++;
                     assertEquals(cfs.getSSTables().size(), files); // we have one original
file plus the ones we have switched out.
                 }
@@ -583,8 +607,8 @@ public class SSTableRewriterTest extends SchemaLoader
             sstables = rewriter.finish();
         }
 
-        SSTableDeletingTask.waitForDeletions();
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        TransactionLogs.waitForDeletions();
+        assertFileCounts(s.descriptor.directory.list());
         validateCFS(cfs);
     }
 
@@ -608,14 +632,14 @@ public class SSTableRewriterTest extends SchemaLoader
              SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false, 1000000);
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID()))
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
             while(ci.hasNext())
             {
                 rewriter.append(ci.next());
                 if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
                 {
                     assertEquals(files, cfs.getSSTables().size()); // all files are now opened
early
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
                     files++;
                 }
             }
@@ -624,8 +648,8 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         assertEquals(files, sstables.size());
         assertEquals(files, cfs.getSSTables().size());
-        SSTableDeletingTask.waitForDeletions();
-        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        TransactionLogs.waitForDeletions();
+        assertFileCounts(s.descriptor.directory.list());
 
         validateCFS(cfs);
     }
@@ -643,10 +667,10 @@ public class SSTableRewriterTest extends SchemaLoader
             SSTableSplitter splitter = new SSTableSplitter(cfs, txn, 10);
             splitter.split();
 
-            assertFileCounts(s.descriptor.directory.list(), 0, 0);
+            assertFileCounts(s.descriptor.directory.list());
 
             s.selfRef().release();
-            SSTableDeletingTask.waitForDeletions();
+            TransactionLogs.waitForDeletions();
 
             for (File f : s.descriptor.directory.listFiles())
             {
@@ -697,13 +721,13 @@ public class SSTableRewriterTest extends SchemaLoader
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
             while (ci.hasNext())
             {
                 rewriter.append(ci.next());
                 if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
                 }
             }
             try
@@ -722,9 +746,9 @@ public class SSTableRewriterTest extends SchemaLoader
                 s.selfRef().release();
         }
 
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
 
-        int filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        int filecount = assertFileCounts(s.descriptor.directory.list());
         assertEquals(filecount, 1);
         if (!offline)
         {
@@ -737,7 +761,7 @@ public class SSTableRewriterTest extends SchemaLoader
             assertEquals(0, cfs.getSSTables().size());
             cfs.truncateBlocking();
         }
-        filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        filecount = assertFileCounts(s.descriptor.directory.list());
         if (offline)
         {
             // the file is not added to the CFS, therefore not truncated away above
@@ -746,7 +770,7 @@ public class SSTableRewriterTest extends SchemaLoader
             {
                 FileUtils.deleteRecursive(f);
             }
-            filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
+            filecount = assertFileCounts(s.descriptor.directory.list());
         }
 
         assertEquals(0, filecount);
@@ -787,13 +811,13 @@ public class SSTableRewriterTest extends SchemaLoader
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
-            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+            rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
             while (ci.hasNext())
             {
                 rewriter.append(ci.next());
                 if (keyCount % 10 == 0)
                 {
-                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                    rewriter.switchWriter(getWriter(cfs, s.descriptor.directory, txn));
                 }
                 keyCount++;
                 validateKeys(keyspace);
@@ -801,7 +825,7 @@ public class SSTableRewriterTest extends SchemaLoader
             rewriter.finish();
         }
         validateKeys(keyspace);
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
         validateCFS(cfs);
         truncate(cfs);
     }
@@ -825,7 +849,7 @@ public class SSTableRewriterTest extends SchemaLoader
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, Collections.singletonList(scanner),
controller, FBUtilities.nowInSeconds(), UUIDGen.getTimeUUID())
         )
         {
-            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory,
txn));
             while (ci.hasNext())
             {
                 writer.append(ci.next());
@@ -870,8 +894,8 @@ public class SSTableRewriterTest extends SchemaLoader
              CompactionIterator ci = new CompactionIterator(OperationType.COMPACTION, scanners.scanners,
controller, nowInSec, UUIDGen.getTimeUUID())
              )
         {
-            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
-            writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory));
+            writer.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory,
txn));
+            writer2.switchWriter(getWriter(cfs, sstables.iterator().next().descriptor.directory,
txn));
             while (ci.hasNext())
             {
                 if (writer.currentWriter().getFilePointer() < 15000000)
@@ -886,7 +910,6 @@ public class SSTableRewriterTest extends SchemaLoader
         validateCFS(cfs);
     }
 
-
     private void validateKeys(Keyspace ks)
     {
         for (int i = 0; i < 100; i++)
@@ -900,8 +923,8 @@ public class SSTableRewriterTest extends SchemaLoader
     public static void truncate(ColumnFamilyStore cfs)
     {
         cfs.truncateBlocking();
-        SSTableDeletingTask.waitForDeletions();
-        Uninterruptibles.sleepUninterruptibly(10L,TimeUnit.MILLISECONDS);
+        TransactionLogs.waitForDeletions();
+        Uninterruptibles.sleepUninterruptibly(10L, TimeUnit.MILLISECONDS);
         assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
         assertEquals(0, cfs.metric.totalDiskSpaceUsed.getCount());
         validateCFS(cfs);
@@ -919,9 +942,9 @@ public class SSTableRewriterTest extends SchemaLoader
         for (int f = 0 ; f < fileCount ; f++)
         {
             File dir = cfs.directories.getDirectoryForNewSSTables();
-            String filename = cfs.getTempSSTablePath(dir);
+            String filename = cfs.getSSTablePath(dir);
 
-            try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
+            try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)))
             {
                 int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount)
/ fileCount;
                 for ( ; i < end ; i++)
@@ -965,7 +988,7 @@ public class SSTableRewriterTest extends SchemaLoader
         assertTrue(cfs.getTracker().getCompacting().isEmpty());
     }
 
-    public static int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
+    public static int assertFileCounts(String [] files)
     {
         int tmplinkcount = 0;
         int tmpcount = 0;
@@ -981,15 +1004,15 @@ public class SSTableRewriterTest extends SchemaLoader
             else if (f.contains("Data"))
                 datacount++;
         }
-        assertEquals(expectedtmplinkCount, tmplinkcount);
-        assertEquals(expectedtmpCount, tmpcount);
+        assertEquals(0, tmplinkcount);
+        assertEquals(0, tmpcount);
         return datacount;
     }
 
-    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+    public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction
txn)
     {
-        String filename = cfs.getTempSSTablePath(directory);
-        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS));
+        String filename = cfs.getSSTablePath(directory);
+        return SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfs.metadata,
cfs.metadata.partitionColumns(), EncodingStats.NO_STATS), txn);
     }
 
     public static ByteBuffer random(int i, int size)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 2c8377f..6de5bb9 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -28,10 +28,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 import org.apache.cassandra.Util;
 import static org.junit.Assert.assertEquals;
@@ -78,7 +75,7 @@ public class SSTableUtils
         File cfDir = new File(tempdir, keyspaceName + File.separator + cfname);
         cfDir.mkdirs();
         cfDir.deleteOnExit();
-        File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation,
Descriptor.Type.FINAL).filenameFor("Data.db"));
+        File datafile = new File(new Descriptor(cfDir, keyspaceName, cfname, generation).filenameFor("Data.db"));
         if (!datafile.createNewFile())
             throw new IOException("unable to create file " + datafile);
         datafile.deleteOnExit();
@@ -185,7 +182,7 @@ public class SSTableUtils
             return write(sorted.size(), new Appender()
             {
                 @Override
-                public boolean append(SSTableWriter writer) throws IOException
+                public boolean append(SSTableTxnWriter writer) throws IOException
                 {
                     if (!iter.hasNext())
                         return false;
@@ -208,7 +205,7 @@ public class SSTableUtils
         {
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, generation)
: new File(dest.filenameFor(Component.DATA));
             SerializationHeader header = SerializationHeader.make(Schema.instance.getCFMetaData(ksname,
cfname), Collections.EMPTY_LIST);
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(datafile.getAbsolutePath()),
expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
+            SSTableTxnWriter writer = SSTableTxnWriter.create(datafile.getAbsolutePath(),
expectedSize, ActiveRepairService.UNREPAIRED_SSTABLE, 0, header);
             while (appender.append(writer)) { /* pass */ }
             SSTableReader reader = writer.finish(true);
             // mark all components for removal
@@ -222,6 +219,6 @@ public class SSTableUtils
     public static abstract class Appender
     {
         /** Called with an open writer until it returns false. */
-        public abstract boolean append(SSTableWriter writer) throws IOException;
+        public abstract boolean append(SSTableTxnWriter writer) throws IOException;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index 7051bd3..c763932 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -74,7 +74,7 @@ public class MetadataSerializerTest
             serializer.serialize(originalMetadata, out);
         }
 
-        Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0, Descriptor.Type.FINAL);
+        Descriptor desc = new Descriptor( statsFile.getParentFile(), "", "", 0);
         try (RandomAccessReader in = RandomAccessReader.open(statsFile))
         {
             Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc,
in, EnumSet.allOf(MetadataType.class));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/schema/DefsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/schema/DefsTest.java b/test/unit/org/apache/cassandra/schema/DefsTest.java
index b567bb5..2bfd6ae 100644
--- a/test/unit/org/apache/cassandra/schema/DefsTest.java
+++ b/test/unit/org/apache/cassandra/schema/DefsTest.java
@@ -39,12 +39,12 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.TransactionLogs;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.locator.OldNetworkTopologyStrategy;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -536,7 +536,7 @@ public class DefsTest
 
         // check
         assertTrue(cfs.indexManager.getIndexes().isEmpty());
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
         assertFalse(new File(desc.filenameFor(Component.DATA)).exists());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 6227a1f..875c306 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -87,7 +87,7 @@ public class StreamTransferTaskTest
         f.get();
 
         // when timeout runs on second file, task should be completed
-        f = task.scheduleTimeout(1, 1, TimeUnit.MILLISECONDS);
+        f = task.scheduleTimeout(1, 10, TimeUnit.MILLISECONDS);
         task.complete(1);
         try
         {
@@ -97,6 +97,7 @@ public class StreamTransferTaskTest
         catch (CancellationException ex)
         {
         }
+
         assertEquals(StreamSession.State.WAIT_COMPLETE, session.state());
 
         // when all streaming are done, time out task should not be scheduled.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
index 4e160c2..f0c850d 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AbstractTransactionalTest.java
@@ -87,9 +87,29 @@ public abstract class AbstractTransactionalTest
         txn = newTest();
         Throwable t = new RuntimeException();
         txn.testing.prepareToCommit();
-        Assert.assertEquals(t, txn.testing.commit(t));
-        Assert.assertEquals(t, txn.testing.abort(t));
-        Assert.assertTrue(t.getSuppressed()[0] instanceof IllegalStateException);
+
+        if (txn.commitCanThrow())
+        {
+            try
+            {
+                txn.testing.commit(t);
+            }
+            catch (Throwable tt)
+            {
+                Assert.assertEquals(t, tt);
+            }
+
+            Assert.assertEquals(t, txn.testing.abort(t));
+            Assert.assertEquals(0, t.getSuppressed().length);
+        }
+        else
+        {
+            Assert.assertEquals(t, txn.testing.commit(t));
+            Assert.assertEquals(t, txn.testing.abort(t));
+            Assert.assertTrue(t.getSuppressed()[0] instanceof IllegalStateException);
+        }
+
+
     }
 
     @Test
@@ -132,5 +152,10 @@ public abstract class AbstractTransactionalTest
         protected abstract void assertPrepared() throws Exception;
         protected abstract void assertAborted() throws Exception;
         protected abstract void assertCommitted() throws Exception;
+
+        protected boolean commitCanThrow()
+        {
+            return false;
+        }
     }
 }


Mime
View raw message