cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [1/3] cassandra git commit: Refactor TransactionLog code and fix order of cleanup bug on Windows
Date Fri, 18 Sep 2015 12:20:00 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 ff27eb304 -> c163d0bc3
  refs/heads/trunk bcba9b7f6 -> aefb2a6ff


Refactor TransactionLog code and fix order of cleanup bug on Windows

patch by stefania; reviewed by benedict for CASSANDRA-10286


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c163d0bc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c163d0bc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c163d0bc

Branch: refs/heads/cassandra-3.0
Commit: c163d0bc365239f4960ab2e19fb72a0ff785afa8
Parents: ff27eb3
Author: Stefania Alborghetti <stefania.alborghetti@datastax.com>
Authored: Wed Sep 9 11:26:14 2015 +0800
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Fri Sep 18 13:18:05 2015 +0100

----------------------------------------------------------------------
 .../io/compress/CompressedSequentialWriter.java |  3 +--
 .../io/compress/CompressionMetadata.java        |  4 +--
 .../cassandra/io/sstable/SSTableTxnWriter.java  |  4 +--
 .../io/sstable/format/big/BigTableWriter.java   |  8 ++----
 .../io/util/ChecksummedSequentialWriter.java    |  5 +---
 .../cassandra/io/util/SequentialWriter.java     | 27 +-------------------
 .../utils/concurrent/Transactional.java         |  5 ++--
 .../CompressedSequentialWriterTest.java         |  1 -
 .../cassandra/io/sstable/SSTableLoaderTest.java | 10 ++++++--
 .../util/ChecksummedSequentialWriterTest.java   |  1 -
 .../cassandra/io/util/SequentialWriterTest.java |  1 -
 11 files changed, 20 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index 8e1ebff..bbec6f5 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -263,7 +263,7 @@ public class CompressedSequentialWriter extends SequentialWriter
         @Override
         protected Throwable doCommit(Throwable accumulate)
         {
-            return metadataWriter.commit(accumulate);
+            return super.doCommit(metadataWriter.commit(accumulate));
         }
 
         @Override
@@ -278,7 +278,6 @@ public class CompressedSequentialWriter extends SequentialWriter
             syncInternal();
             if (descriptor != null)
                 crcMetadata.writeFullChecksum(descriptor);
-            releaseFileHandle();
             sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
             metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 1681b0c..04ef2d3 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -410,7 +410,7 @@ public class CompressionMetadata
             count = chunkIndex;
         }
 
-        protected Throwable doPreCleanup(Throwable failed)
+        protected Throwable doPostCleanup(Throwable failed)
         {
             return offsets.close(failed);
         }
@@ -422,7 +422,7 @@ public class CompressionMetadata
 
         protected Throwable doAbort(Throwable accumulate)
         {
-            return FileUtils.deleteWithConfirm(filePath, false, accumulate);
+            return accumulate;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
index 6e1ac38..5d65a30 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java
@@ -69,13 +69,13 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional
implem
 
     protected Throwable doAbort(Throwable accumulate)
     {
-        return writer.abort(txn.abort(accumulate));
+        return txn.abort(writer.abort(accumulate));
     }
 
     protected void doPrepare()
     {
-        txn.prepareToCommit();
         writer.prepareToCommit();
+        txn.prepareToCommit();
     }
 
     public Collection<SSTableReader> finish(boolean openResult)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 06dd508..d2500b4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -81,10 +81,6 @@ public class BigTableWriter extends SSTableWriter
             dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false);
         }
         iwriter = new IndexWriter(keyCount, dataFile);
-
-        // txnLogs will delete if safe to do so (early readers)
-        iwriter.indexFile.deleteFile(false);
-        dataFile.deleteFile(false);
     }
 
     public void mark()
@@ -322,7 +318,7 @@ public class BigTableWriter extends SSTableWriter
         }
 
         @Override
-        protected Throwable doPreCleanup(Throwable accumulate)
+        protected Throwable doPostCleanup(Throwable accumulate)
         {
             accumulate = dbuilder.close(accumulate);
             return accumulate;
@@ -485,7 +481,7 @@ public class BigTableWriter extends SSTableWriter
         }
 
         @Override
-        protected Throwable doPreCleanup(Throwable accumulate)
+        protected Throwable doPostCleanup(Throwable accumulate)
         {
             accumulate = summary.close(accumulate);
             accumulate = bf.close(accumulate);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index 8203a37..fd88151 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -50,7 +50,7 @@ public class ChecksummedSequentialWriter extends SequentialWriter
         @Override
         protected Throwable doCommit(Throwable accumulate)
         {
-            return crcWriter.commit(accumulate);
+            return super.doCommit(crcWriter.commit(accumulate));
         }
 
         @Override
@@ -66,9 +66,6 @@ public class ChecksummedSequentialWriter extends SequentialWriter
             if (descriptor != null)
                 crcMetadata.writeFullChecksum(descriptor);
             crcWriter.setDescriptor(descriptor).prepareToCommit();
-            // we must cleanup our file handles during prepareCommit for Windows compatibility
as we cannot rename an open file;
-            // TODO: once we stop file renaming, remove this for clarity
-            releaseFileHandle();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index 6000f95..5bdc15a 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -68,8 +68,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements
Tr
     // due to lack of multiple-inheritance, we proxy our transactional implementation
     protected class TransactionalProxy extends AbstractTransactional
     {
-        private boolean deleteFile = true;
-
         @Override
         protected Throwable doPreCleanup(Throwable accumulate)
         {
@@ -90,9 +88,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements
Tr
         protected void doPrepare()
         {
             syncInternal();
-            // we must cleanup our file handles during prepareCommit for Windows compatibility
as we cannot rename an open file;
-            // TODO: once we stop file renaming, remove this for clarity
-            releaseFileHandle();
         }
 
         protected Throwable doCommit(Throwable accumulate)
@@ -102,10 +97,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements
Tr
 
         protected Throwable doAbort(Throwable accumulate)
         {
-            if (deleteFile)
-                return FileUtils.deleteWithConfirm(filePath, false, accumulate);
-            else
-                return accumulate;
+            return accumulate;
         }
     }
 
@@ -409,23 +401,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements
Tr
         return new TransactionalProxy();
     }
 
-    public void deleteFile(boolean val)
-    {
-        txnProxy.deleteFile = val;
-    }
-
-    public void releaseFileHandle()
-    {
-        try
-        {
-            channel.close();
-        }
-        catch (IOException e)
-        {
-            throw new FSWriteError(e, filePath);
-        }
-    }
-
     /**
      * Class to hold a mark to the position of the file
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
index 02562ce..d142f06 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -88,7 +88,8 @@ public interface Transactional extends AutoCloseable
         // Transactional objects will perform cleanup in the commit() or abort() calls
 
         /**
-         * perform an exception-safe pre-abort cleanup; this will still be run *after* commit
+         * perform an exception-safe pre-abort/commit cleanup;
+         * this will be run after prepareToCommit (so before commit), and before abort
          */
         protected Throwable doPreCleanup(Throwable accumulate){ return accumulate; }
 
@@ -113,7 +114,6 @@ public interface Transactional extends AutoCloseable
             if (state != State.READY_TO_COMMIT)
                 throw new IllegalStateException("Cannot commit unless READY_TO_COMMIT; state
is " + state);
             accumulate = doCommit(accumulate);
-            accumulate = doPreCleanup(accumulate);
             accumulate = doPostCleanup(accumulate);
             state = State.COMMITTED;
             return accumulate;
@@ -171,6 +171,7 @@ public interface Transactional extends AutoCloseable
                 throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS;
state is " + state);
 
             doPrepare();
+            maybeFail(doPreCleanup(null));
             state = State.READY_TO_COMMIT;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 1bdc591..56c83da 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -222,7 +222,6 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest
         protected void assertAborted() throws Exception
         {
             super.assertAborted();
-            Assert.assertFalse(offsetsFile.exists());
         }
 
         void cleanup()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index faa9c3e..ad7523d 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -131,11 +131,14 @@ public class SSTableLoaderTest
             writer.addRow("key1", "col1", "100");
         }
 
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+        cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able
to stream them
+
         final CountDownLatch latch = new CountDownLatch(1);
         SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false,
false));
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
-        List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build());
+        List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
 
         assertEquals(1, partitions.size());
         assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey()));
@@ -175,6 +178,9 @@ public class SSTableLoaderTest
                 writer.addRow(String.format("key%d", i), String.format("col%d", j), "100");
         }
 
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2);
+        cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able
to stream them
+
         //make sure we have some tables...
         assertTrue(dataDir.listFiles().length > 0);
 
@@ -183,7 +189,7 @@ public class SSTableLoaderTest
         SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false,
false));
         loader.stream(Collections.emptySet(), completionStreamListener(latch)).get();
 
-        List<FilteredPartition> partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
+        List<FilteredPartition> partitions = Util.getAll(Util.cmd(cfs).build());
 
         assertTrue(partitions.size() > 0 && partitions.size() < NB_PARTITIONS);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
index 9731a8d..bea3aac 100644
--- a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java
@@ -85,7 +85,6 @@ public class ChecksummedSequentialWriterTest extends SequentialWriterTest
         protected void assertAborted() throws Exception
         {
             super.assertAborted();
-            Assert.assertFalse(crcFile.exists());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
index fd38427..f5a366e 100644
--- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java
@@ -102,7 +102,6 @@ public class SequentialWriterTest extends AbstractTransactionalTest
         protected void assertAborted() throws Exception
         {
             Assert.assertFalse(writer.isOpen());
-            Assert.assertFalse(file.exists());
         }
 
         protected void assertCommitted() throws Exception


Mime
View raw message