Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A9C3179AD for ; Fri, 18 Sep 2015 12:20:01 +0000 (UTC) Received: (qmail 51827 invoked by uid 500); 18 Sep 2015 12:20:01 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 51785 invoked by uid 500); 18 Sep 2015 12:20:01 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 51763 invoked by uid 99); 18 Sep 2015 12:20:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2015 12:20:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D1E5AE03A7; Fri, 18 Sep 2015 12:20:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Fri, 18 Sep 2015 12:20:00 -0000 Message-Id: <0d53a52a9a8e4a8d9445ada2ffc3b5b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] cassandra git commit: Refactor TransactionLog code and fix order of cleanup bug on Windows Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 ff27eb304 -> c163d0bc3 refs/heads/trunk bcba9b7f6 -> aefb2a6ff Refactor TransactionLog code and fix order of cleanup bug on Windows patch by stefania; reviewed by benedict for CASSANDRA-10286 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c163d0bc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c163d0bc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c163d0bc Branch: refs/heads/cassandra-3.0 Commit: c163d0bc365239f4960ab2e19fb72a0ff785afa8 Parents: ff27eb3 Author: Stefania Alborghetti Authored: Wed Sep 9 11:26:14 2015 +0800 Committer: Benedict Elliott Smith Committed: Fri Sep 18 13:18:05 2015 +0100 ---------------------------------------------------------------------- .../io/compress/CompressedSequentialWriter.java | 3 +-- .../io/compress/CompressionMetadata.java | 4 +-- .../cassandra/io/sstable/SSTableTxnWriter.java | 4 +-- .../io/sstable/format/big/BigTableWriter.java | 8 ++---- .../io/util/ChecksummedSequentialWriter.java | 5 +--- .../cassandra/io/util/SequentialWriter.java | 27 +------------------- .../utils/concurrent/Transactional.java | 5 ++-- .../CompressedSequentialWriterTest.java | 1 - .../cassandra/io/sstable/SSTableLoaderTest.java | 10 ++++++-- .../util/ChecksummedSequentialWriterTest.java | 1 - .../cassandra/io/util/SequentialWriterTest.java | 1 - 11 files changed, 20 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 8e1ebff..bbec6f5 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -263,7 +263,7 @@ public class CompressedSequentialWriter extends SequentialWriter @Override protected Throwable doCommit(Throwable accumulate) { - return metadataWriter.commit(accumulate); + return super.doCommit(metadataWriter.commit(accumulate)); } @Override @@ -278,7 +278,6 @@ public class CompressedSequentialWriter extends SequentialWriter syncInternal(); if (descriptor != null) crcMetadata.writeFullChecksum(descriptor); - releaseFileHandle(); sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize); metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 1681b0c..04ef2d3 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -410,7 +410,7 @@ public class CompressionMetadata count = chunkIndex; } - protected Throwable doPreCleanup(Throwable failed) + protected Throwable doPostCleanup(Throwable failed) { return offsets.close(failed); } @@ -422,7 +422,7 @@ public class CompressionMetadata protected Throwable doAbort(Throwable accumulate) { - return FileUtils.deleteWithConfirm(filePath, false, accumulate); + return accumulate; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java index 6e1ac38..5d65a30 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableTxnWriter.java @@ -69,13 +69,13 @@ public class SSTableTxnWriter extends Transactional.AbstractTransactional implem protected Throwable doAbort(Throwable accumulate) { - return writer.abort(txn.abort(accumulate)); + return txn.abort(writer.abort(accumulate)); } protected void doPrepare() { - txn.prepareToCommit(); writer.prepareToCommit(); + txn.prepareToCommit(); } public Collection finish(boolean openResult) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 06dd508..d2500b4 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -81,10 +81,6 @@ public class BigTableWriter extends SSTableWriter dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode(), false); } iwriter = new IndexWriter(keyCount, dataFile); - - // txnLogs will delete if safe to do so (early readers) - iwriter.indexFile.deleteFile(false); - dataFile.deleteFile(false); } public void mark() @@ -322,7 +318,7 @@ public class BigTableWriter extends SSTableWriter } @Override - protected Throwable doPreCleanup(Throwable accumulate) + protected Throwable doPostCleanup(Throwable accumulate) { accumulate = dbuilder.close(accumulate); return accumulate; @@ -485,7 +481,7 @@ public class BigTableWriter extends SSTableWriter } @Override - protected Throwable doPreCleanup(Throwable accumulate) + protected Throwable doPostCleanup(Throwable accumulate) { accumulate = summary.close(accumulate); accumulate = bf.close(accumulate); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java index 8203a37..fd88151 100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java @@ -50,7 +50,7 @@ public class ChecksummedSequentialWriter extends SequentialWriter @Override protected Throwable doCommit(Throwable accumulate) { - return crcWriter.commit(accumulate); + return super.doCommit(crcWriter.commit(accumulate)); } @Override @@ -66,9 +66,6 @@ public class ChecksummedSequentialWriter extends SequentialWriter if (descriptor != null) crcMetadata.writeFullChecksum(descriptor); crcWriter.setDescriptor(descriptor).prepareToCommit(); - // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file; - // TODO: once we stop file renaming, remove this for clarity - releaseFileHandle(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 6000f95..5bdc15a 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -68,8 +68,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr // due to lack of multiple-inheritance, we proxy our transactional implementation protected class TransactionalProxy extends AbstractTransactional { - private boolean deleteFile = true; - @Override protected Throwable doPreCleanup(Throwable accumulate) { @@ -90,9 +88,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr protected void doPrepare() { syncInternal(); - // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file; - // TODO: once we stop file renaming, remove this for clarity - releaseFileHandle(); } protected Throwable doCommit(Throwable accumulate) @@ -102,10 +97,7 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr protected Throwable doAbort(Throwable accumulate) { - if (deleteFile) - return FileUtils.deleteWithConfirm(filePath, false, accumulate); - else - return accumulate; + return accumulate; } } @@ -409,23 +401,6 @@ public class SequentialWriter extends BufferedDataOutputStreamPlus implements Tr return new TransactionalProxy(); } - public void deleteFile(boolean val) - { - txnProxy.deleteFile = val; - } - - public void releaseFileHandle() - { - try - { - channel.close(); - } - catch (IOException e) - { - throw new FSWriteError(e, filePath); - } - } - /** * Class to hold a mark to the position of the file */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/src/java/org/apache/cassandra/utils/concurrent/Transactional.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java index 02562ce..d142f06 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java @@ -88,7 +88,8 @@ public interface Transactional extends AutoCloseable // Transactional objects will perform cleanup in the commit() or abort() calls /** - * perform an exception-safe pre-abort cleanup; this will still be run *after* commit + * perform an exception-safe pre-abort/commit cleanup; + * this will be run after prepareToCommit (so before commit), and before abort */ protected Throwable doPreCleanup(Throwable accumulate){ return accumulate; } @@ -113,7 +114,6 @@ public interface Transactional extends AutoCloseable if (state != State.READY_TO_COMMIT) throw new IllegalStateException("Cannot commit unless READY_TO_COMMIT; state is " + state); accumulate = doCommit(accumulate); - accumulate = doPreCleanup(accumulate); accumulate = doPostCleanup(accumulate); state = State.COMMITTED; return accumulate; @@ -171,6 +171,7 @@ public interface Transactional extends AutoCloseable throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state); doPrepare(); + maybeFail(doPreCleanup(null)); state = State.READY_TO_COMMIT; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java index 1bdc591..56c83da 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java @@ -222,7 +222,6 @@ public class CompressedSequentialWriterTest extends SequentialWriterTest protected void assertAborted() throws Exception { super.assertAborted(); - Assert.assertFalse(offsetsFile.exists()); } void cleanup() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index faa9c3e..ad7523d 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -131,11 +131,14 @@ public class SSTableLoaderTest writer.addRow("key1", "col1", "100"); } + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1); + cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them + final CountDownLatch latch = new CountDownLatch(1); SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); - List partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1)).build()); + List partitions = Util.getAll(Util.cmd(cfs).build()); assertEquals(1, partitions.size()); assertEquals("key1", AsciiType.instance.getString(partitions.get(0).partitionKey().getKey())); @@ -175,6 +178,9 @@ public class SSTableLoaderTest writer.addRow(String.format("key%d", i), String.format("col%d", j), "100"); } + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2); + cfs.forceBlockingFlush(); // wait for sstables to be on disk else we won't be able to stream them + //make sure we have some tables... assertTrue(dataDir.listFiles().length > 0); @@ -183,7 +189,7 @@ public class SSTableLoaderTest SSTableLoader loader = new SSTableLoader(dataDir, new TestClient(), new OutputHandler.SystemOutput(false, false)); loader.stream(Collections.emptySet(), completionStreamListener(latch)).get(); - List partitions = Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build()); + List partitions = Util.getAll(Util.cmd(cfs).build()); assertTrue(partitions.size() > 0 && partitions.size() < NB_PARTITIONS); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java index 9731a8d..bea3aac 100644 --- a/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/ChecksummedSequentialWriterTest.java @@ -85,7 +85,6 @@ public class ChecksummedSequentialWriterTest extends SequentialWriterTest protected void assertAborted() throws Exception { super.assertAborted(); - Assert.assertFalse(crcFile.exists()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c163d0bc/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java index fd38427..f5a366e 100644 --- a/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java +++ b/test/unit/org/apache/cassandra/io/util/SequentialWriterTest.java @@ -102,7 +102,6 @@ public class SequentialWriterTest extends AbstractTransactionalTest protected void assertAborted() throws Exception { Assert.assertFalse(writer.isOpen()); - Assert.assertFalse(file.exists()); } protected void assertCommitted() throws Exception