Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3AF4A18030 for ; Wed, 19 Aug 2015 01:16:53 +0000 (UTC) Received: (qmail 28974 invoked by uid 500); 19 Aug 2015 01:16:53 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 28815 invoked by uid 500); 19 Aug 2015 01:16:53 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 28756 invoked by uid 99); 19 Aug 2015 01:16:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Aug 2015 01:16:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB30DE053F; Wed, 19 Aug 2015 01:16:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Wed, 19 Aug 2015 01:16:54 -0000 Message-Id: <4913ba84ed4e454b951693ec7b72f606@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/8] cassandra git commit: Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush. Reviewed by aweisberg for CASSANDRA-9749 Honors commit log policy when replaying; treats errors in the last segment (section for compressed files) as permissible due to incomplete flush. Reviewed by aweisberg for CASSANDRA-9749 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fce8478f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fce8478f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fce8478f Branch: refs/heads/trunk Commit: fce8478ff43232d7e3623e457b1252f9700b451d Parents: 6d9fa37 Author: Branimir Lambov Authored: Thu Jul 30 20:59:16 2015 +0300 Committer: Jonathan Ellis Committed: Tue Aug 18 20:13:26 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/commitlog/CommitLog.java | 4 +- .../db/commitlog/CommitLogReplayer.java | 120 ++++- .../CommitLog-5-1438186885380.log | Bin 0 -> 839051 bytes .../legacy-commitlog/2.2-lz4-bitrot/hash.txt | 6 + .../CommitLog-5-1438186885380.log | Bin 0 -> 839051 bytes .../legacy-commitlog/2.2-lz4-bitrot2/hash.txt | 6 + .../CommitLog-5-1438186885380.log | Bin 0 -> 839001 bytes .../legacy-commitlog/2.2-lz4-truncated/hash.txt | 5 + .../db/CommitLogFailurePolicyTest.java | 141 ----- .../org/apache/cassandra/db/CommitLogTest.java | 421 --------------- .../commitlog/CommitLogFailurePolicyTest.java | 140 +++++ .../cassandra/db/commitlog/CommitLogTest.java | 535 +++++++++++++++++++ .../db/commitlog/CommitLogUpgradeTest.java | 55 ++ 14 files changed, 844 insertions(+), 590 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 1299aa2..b9af440 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.1 + * Apply commit_failure_policy to more errors on startup (CASSANDRA-9749) * Fix histogram overflow exception (CASSANDRA-9973) * Route gossip messages over dedicated socket (CASSANDRA-9237) * Add checksum to saved cache files (CASSANDRA-9265) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index b3f944d..f23ebae 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -189,7 +189,9 @@ public class CommitLog implements CommitLogMBean */ public void recover(String path) throws IOException { - recover(new File(path)); + CommitLogReplayer recovery = CommitLogReplayer.create(); + recovery.recover(new File(path), false); + recovery.blockForWrites(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 176f64b..af515d2 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -34,12 +34,9 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; - import org.apache.commons.lang3.StringUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.github.tjake.ICRC32; import org.apache.cassandra.concurrent.Stage; @@ -64,6 +61,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet; public class CommitLogReplayer { + static final String IGNORE_REPLAY_ERRORS_PROPERTY = "cassandra.commitlog.ignorereplayerrors"; private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); private static final int MAX_OUTSTANDING_REPLAY_COUNT = Integer.getInteger("cassandra.commitlog_max_outstanding_replay_count", 1024); private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; @@ -142,14 +140,15 @@ public class CommitLogReplayer public void recover(File[] clogs) throws IOException { - for (final File file : clogs) - recover(file); + int i; + for (i = 0; i < clogs.length; ++i) + recover(clogs[i], i + 1 == clogs.length); } public int blockForWrites() { for (Map.Entry entry : invalidMutations.entrySet()) - logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey())); + logger.warn(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey())); // wait for all the writes to finish on the mutation stage FBUtilities.waitOnFutures(futures); @@ -163,7 +162,7 @@ public class CommitLogReplayer return replayedCount.get(); } - private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException + private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader, boolean tolerateTruncation) throws IOException { if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) { @@ -181,13 +180,17 @@ public class CommitLogReplayer { if (end != 0 || filecrc != 0) { - logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath()); + handleReplayError(false, + "Encountered bad header at position %d of commit log %s, with invalid CRC. " + + "The end of segment marker should be zero.", + offset, reader.getPath()); } return -1; } else if (end < offset || end > reader.length()) { - logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath()); + handleReplayError(tolerateTruncation, "Encountered bad header at position %d of commit log %s, with bad position but valid CRC", + offset, reader.getPath()); return -1; } return end; @@ -268,8 +271,7 @@ public class CommitLogReplayer } } - @SuppressWarnings("resource") - public void recover(File file) throws IOException + public void recover(File file, boolean tolerateTruncation) throws IOException { CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); @@ -281,7 +283,7 @@ public class CommitLogReplayer return; if (globalPosition.segment == desc.id) reader.seek(globalPosition.position); - replaySyncSection(reader, (int) reader.getPositionLimit(), desc); + replaySyncSection(reader, (int) reader.getPositionLimit(), desc, desc.fileName(), tolerateTruncation); return; } @@ -295,10 +297,15 @@ public class CommitLogReplayer desc = null; } if (desc == null) { - logger.warn("Could not read commit log descriptor in file {}", file); + handleReplayError(false, "Could not read commit log descriptor in file %s", file); return; } - assert segmentId == desc.id; + if (segmentId != desc.id) + { + handleReplayError(false, "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentId, desc.id, file); + // continue processing if ignored. + } + if (logAndCheckIfShouldSkip(file, desc)) return; @@ -311,7 +318,7 @@ public class CommitLogReplayer } catch (ConfigurationException e) { - logger.warn("Unknown compression: {}", e.getMessage()); + handleReplayError(false, "Unknown compression: %s", e.getMessage()); return; } } @@ -320,7 +327,7 @@ public class CommitLogReplayer int end = (int) reader.getFilePointer(); int replayEnd = end; - while ((end = readSyncMarker(desc, end, reader)) >= 0) + while ((end = readSyncMarker(desc, end, reader, tolerateTruncation)) >= 0) { int replayPos = replayEnd + CommitLogSegment.SYNC_MARKER_SIZE; @@ -340,11 +347,17 @@ public class CommitLogReplayer continue; FileDataInput sectionReader = reader; + String errorContext = desc.fileName(); + // In the uncompressed case the last non-fully-flushed section can be anywhere in the file. + boolean tolerateErrorsInSection = tolerateTruncation; if (compressor != null) { + // In the compressed case we know if this is the last section. + tolerateErrorsInSection &= end == reader.length() || end < 0; + + int start = (int) reader.getFilePointer(); try { - int start = (int) reader.getFilePointer(); int compressedLength = end - start; if (logger.isDebugEnabled()) logger.trace("Decompressing {} between replay positions {} and {}", @@ -359,15 +372,18 @@ public class CommitLogReplayer uncompressedBuffer = new byte[(int) (1.2 * uncompressedLength)]; compressedLength = compressor.uncompress(buffer, 0, compressedLength, uncompressedBuffer, 0); sectionReader = new ByteBufferDataInput(ByteBuffer.wrap(uncompressedBuffer), reader.getPath(), replayPos, 0); + errorContext = "compressed section at " + start + " in " + errorContext; } - catch (IOException e) + catch (IOException | ArrayIndexOutOfBoundsException e) { - logger.error("Unexpected exception decompressing section {}", e); + handleReplayError(tolerateErrorsInSection, + "Unexpected exception decompressing section at %d: %s", + start, e); continue; } } - if (!replaySyncSection(sectionReader, replayEnd, desc)) + if (!replaySyncSection(sectionReader, replayEnd, desc, errorContext, tolerateErrorsInSection)) break; } } @@ -399,13 +415,14 @@ public class CommitLogReplayer * * @return Whether replay should continue with the next section. */ - private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException + private boolean replaySyncSection(FileDataInput reader, int end, CommitLogDescriptor desc, String errorContext, boolean tolerateErrors) throws IOException { /* read the logs populate Mutation and apply */ while (reader.getFilePointer() < end && !reader.isEOF()) { + long mutationStart = reader.getFilePointer(); if (logger.isDebugEnabled()) - logger.trace("Reading mutation at {}", reader.getFilePointer()); + logger.trace("Reading mutation at {}", mutationStart); long claimedCRC32; int serializedSize; @@ -424,7 +441,12 @@ public class CommitLogReplayer // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 if (serializedSize < 10) + { + handleReplayError(tolerateErrors, + "Invalid mutation size %d at %d in %s", + serializedSize, mutationStart, errorContext); return false; + } long claimedSizeChecksum; if (desc.version < CommitLogDescriptor.VERSION_21) @@ -438,7 +460,12 @@ public class CommitLogReplayer checksum.updateInt(serializedSize); if (checksum.getValue() != claimedSizeChecksum) + { + handleReplayError(tolerateErrors, + "Mutation size checksum failure at %d in %s", + mutationStart, errorContext); return false; + } // ok. if (serializedSize > buffer.length) @@ -451,14 +478,18 @@ public class CommitLogReplayer } catch (EOFException eof) { + handleReplayError(tolerateErrors, + "Unexpected end of segment", + mutationStart, errorContext); return false; // last CL entry didn't get completely written. that's ok. } checksum.update(buffer, 0, serializedSize); if (claimedCRC32 != checksum.getValue()) { - // this entry must not have been fsynced. probably the rest is bad too, - // but just in case there is no harm in trying them (since we still read on an entry boundary) + handleReplayError(tolerateErrors, + "Mutation checksum failure at %d in %s", + mutationStart, errorContext); continue; } replayMutation(buffer, serializedSize, reader.getFilePointer(), desc); @@ -508,9 +539,13 @@ public class CommitLogReplayer out.write(inputBuffer, 0, size); } - String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", - f.getAbsolutePath()); - logger.error(st, t); + // Checksum passed so this error can't be permissible. + handleReplayError(false, + "Unexpected error deserializing mutation; saved to %s. " + + "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " + + "Exception follows: %s", + f.getAbsolutePath(), + t); return; } @@ -578,4 +613,35 @@ public class CommitLogReplayer } return false; } + + static void handleReplayError(boolean permissible, String message, Object... messageArgs) throws IOException + { + String msg = String.format(message, messageArgs); + IOException e = new CommitLogReplayException(msg); + if (permissible) + logger.error("Ignoring commit log replay error likely due to incomplete flush to disk", e); + else if (Boolean.getBoolean(IGNORE_REPLAY_ERRORS_PROPERTY)) + logger.error("Ignoring commit log replay error", e); + else if (!CommitLog.handleCommitError("Failed commit log replay", e)) + { + logger.error("Replay stopped. If you wish to override this error and continue starting the node ignoring " + + "commit log replay problems, specify -D" + IGNORE_REPLAY_ERRORS_PROPERTY + "=true " + + "on the command line"); + throw e; + } + } + + @SuppressWarnings("serial") + public static class CommitLogReplayException extends IOException + { + public CommitLogReplayException(String message, Throwable cause) + { + super(message, cause); + } + + public CommitLogReplayException(String message) + { + super(message); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log new file mode 100644 index 0000000..d248d59 Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot/CommitLog-5-1438186885380.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt new file mode 100644 index 0000000..c4d8fe7 --- /dev/null +++ b/test/data/legacy-commitlog/2.2-lz4-bitrot/hash.txt @@ -0,0 +1,6 @@ +#CommitLog bitrot test, version 2.2.0-SNAPSHOT +#This is a copy of 2.2-lz4 with some overwritten bytes. +#Replaying this should result in an error which can be overridden. +cells=6051 +hash=-170208326 +cfid=dc32ce20-360d-11e5-826c-afadad37221d http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log new file mode 100644 index 0000000..083d65c Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-bitrot2/CommitLog-5-1438186885380.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt new file mode 100644 index 0000000..c49dda0 --- /dev/null +++ b/test/data/legacy-commitlog/2.2-lz4-bitrot2/hash.txt @@ -0,0 +1,6 @@ +#CommitLog upgrade test, version 2.2.0-SNAPSHOT +#This is a copy of 2.2-lz4 with some overwritten bytes. +#Replaying this should result in an error which can be overridden. +cells=6037 +hash=-1312748407 +cfid=dc32ce20-360d-11e5-826c-afadad37221d http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log new file mode 100644 index 0000000..939d408 Binary files /dev/null and b/test/data/legacy-commitlog/2.2-lz4-truncated/CommitLog-5-1438186885380.log differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt ---------------------------------------------------------------------- diff --git a/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt new file mode 100644 index 0000000..ce7f600 --- /dev/null +++ b/test/data/legacy-commitlog/2.2-lz4-truncated/hash.txt @@ -0,0 +1,5 @@ +#Truncated CommitLog test. +#This is a copy of 2.2-lz4 with the last 50 bytes deleted. +cells=6037 +hash=-889057729 +cfid=dc32ce20-360d-11e5-826c-afadad37221d http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java deleted file mode 100644 index 0ecab3c..0000000 --- a/test/unit/org/apache/cassandra/db/CommitLogFailurePolicyTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.cassandra.db; - -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.service.CassandraDaemon; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.KillerForTests; - -public class CommitLogFailurePolicyTest -{ - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - SchemaLoader.prepareServer(); - System.setProperty("cassandra.commitlog.stop_on_errors", "true"); - } - - @Test - public void testCommitFailurePolicy_stop() throws ConfigurationException - { - CassandraDaemon daemon = new CassandraDaemon(); - daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy - StorageService.instance.registerDaemon(daemon); - - // Need storage service active so stop policy can shutdown gossip - StorageService.instance.initServer(); - Assert.assertTrue(Gossiper.instance.isEnabled()); - - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); - CommitLog.handleCommitError("Test stop error", new Throwable()); - Assert.assertFalse(Gossiper.instance.isEnabled()); - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - } - } - - @Test - public void testCommitFailurePolicy_die() - { - CassandraDaemon daemon = new CassandraDaemon(); - daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy - StorageService.instance.registerDaemon(daemon); - - KillerForTests killerForTests = new KillerForTests(); - JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); - CommitLog.handleCommitError("Testing die policy", new Throwable()); - Assert.assertTrue(killerForTests.wasKilled()); - Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - JVMStabilityInspector.replaceKiller(originalKiller); - } - } - - @Test - public void testCommitFailurePolicy_ignore_beforeStartup() - { - //startup was not completed successfuly (since method completeSetup() was not called) - CassandraDaemon daemon = new CassandraDaemon(); - StorageService.instance.registerDaemon(daemon); - - KillerForTests killerForTests = new KillerForTests(); - JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); - CommitLog.handleCommitError("Testing ignore policy", new Throwable()); - //even though policy is ignore, JVM must die because Daemon has not finished initializing - Assert.assertTrue(killerForTests.wasKilled()); - Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - JVMStabilityInspector.replaceKiller(originalKiller); - } - } - - @Test - public void testCommitFailurePolicy_ignore_afterStartup() throws Exception - { - CassandraDaemon daemon = new CassandraDaemon(); - daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy - StorageService.instance.registerDaemon(daemon); - - KillerForTests killerForTests = new KillerForTests(); - JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); - Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); - try - { - DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); - CommitLog.handleCommitError("Testing ignore policy", new Throwable()); - //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup - Assert.assertFalse(killerForTests.wasKilled()); - } - finally - { - DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); - JVMStabilityInspector.replaceKiller(originalKiller); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/CommitLogTest.java b/test/unit/org/apache/cassandra/db/CommitLogTest.java deleted file mode 100644 index 2764da4..0000000 --- a/test/unit/org/apache/cassandra/db/CommitLogTest.java +++ /dev/null @@ -1,421 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.cassandra.db; - -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutionException; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import com.google.common.collect.ImmutableMap; - -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.commitlog.CommitLog; -import org.apache.cassandra.db.commitlog.CommitLogDescriptor; -import org.apache.cassandra.db.commitlog.CommitLogSegmentManager; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.db.commitlog.CommitLogSegment; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.gms.Gossiper; -import org.apache.cassandra.io.util.ByteBufferDataInput; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.locator.SimpleStrategy; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.CassandraDaemon; -import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; -import org.apache.cassandra.utils.KillerForTests; - -public class CommitLogTest -{ - private static final String KEYSPACE1 = "CommitLogTest"; - private static final String KEYSPACE2 = "CommitLogTestNonDurable"; - private static final String CF1 = "Standard1"; - private static final String CF2 = "Standard2"; - - @BeforeClass - public static void defineSchema() throws ConfigurationException - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF1), - SchemaLoader.standardCFMD(KEYSPACE1, CF2)); - SchemaLoader.createKeyspace(KEYSPACE2, - false, - true, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF1), - SchemaLoader.standardCFMD(KEYSPACE1, CF2)); - System.setProperty("cassandra.commitlog.stop_on_errors", "true"); - CompactionManager.instance.disableAutoCompaction(); - } - - @Test - public void testRecoveryWithEmptyLog() throws Exception - { - CommitLog.instance.recover(new File[]{ tmpFile() }); - } - - @Test - public void testRecoveryWithShortLog() throws Exception - { - // force EOF while reading log - testRecoveryWithBadSizeArgument(100, 10); - } - - @Test - public void testRecoveryWithShortSize() throws Exception - { - testRecovery(new byte[2]); - } - - @Test - public void testRecoveryWithShortCheckSum() throws Exception - { - testRecovery(new byte[6]); - } - - @Test - public void testRecoveryWithGarbageLog() throws Exception - { - byte[] garbage = new byte[100]; - (new java.util.Random()).nextBytes(garbage); - testRecovery(garbage); - } - - @Test - public void testRecoveryWithBadSizeChecksum() throws Exception - { - Checksum checksum = new CRC32(); - checksum.update(100); - testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue()); - } - - @Test - public void testRecoveryWithZeroSegmentSizeArgument() throws Exception - { - // many different combinations of 4 bytes (garbage) will be read as zero by readInt() - testRecoveryWithBadSizeArgument(0, 10); // zero size, but no EOF - } - - @Test - public void testRecoveryWithNegativeSizeArgument() throws Exception - { - // garbage from a partial/bad flush could be read as a negative size even if there is no EOF - testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF - } - - @Test - public void testDontDeleteIfDirty() throws Exception - { - CommitLog.instance.resetUnsafe(true); - // Roughly 32 MB mutation - Mutation rm = new Mutation(KEYSPACE1, bytes("k")); - rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0); - - // Adding it 5 times - CommitLog.instance.add(rm); - CommitLog.instance.add(rm); - CommitLog.instance.add(rm); - CommitLog.instance.add(rm); - CommitLog.instance.add(rm); - - // Adding new mutation on another CF - Mutation rm2 = new Mutation(KEYSPACE1, bytes("k")); - rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0); - CommitLog.instance.add(rm2); - - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); - - UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); - - // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); - } - - @Test - public void testDeleteIfNotDirty() throws Exception - { - DatabaseDescriptor.getCommitLogSegmentSize(); - CommitLog.instance.resetUnsafe(true); - // Roughly 32 MB mutation - Mutation rm = new Mutation(KEYSPACE1, bytes("k")); - rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0); - - // Adding it twice (won't change segment) - CommitLog.instance.add(rm); - CommitLog.instance.add(rm); - - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); - - // "Flush": this won't delete anything - UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); - CommitLog.instance.sync(true); - CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext()); - - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); - - // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created - Mutation rm2 = new Mutation(KEYSPACE1, bytes("k")); - rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200), 0); - CommitLog.instance.add(rm2); - // also forces a new segment, since each entry-with-overhead is just under half the CL size - CommitLog.instance.add(rm2); - CommitLog.instance.add(rm2); - - assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments(); - - - // "Flush" second cf: The first segment should be deleted since we - // didn't write anything on cf1 since last flush (and we flush cf2) - - UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); - CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); - - // Assert we still have both our segment - assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); - } - - private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column) - { - Mutation rm = new Mutation(KEYSPACE1, bytes("k")); - rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0); - - int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2); - max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead - return max - (int) Mutation.serializer.serializedSize(rm, MessagingService.current_version); - } - - private static int getMaxRecordDataSize() - { - return getMaxRecordDataSize(KEYSPACE1, bytes("k"), CF1, Util.cellname("c1")); - } - - // CASSANDRA-3615 - @Test - public void testEqualRecordLimit() throws Exception - { - CommitLog.instance.resetUnsafe(true); - - Mutation rm = new Mutation(KEYSPACE1, bytes("k")); - rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0); - CommitLog.instance.add(rm); - } - - @Test - public void testExceedRecordLimit() throws Exception - { - CommitLog.instance.resetUnsafe(true); - try - { - Mutation rm = new Mutation(KEYSPACE1, bytes("k")); - rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(1 + getMaxRecordDataSize()), 0); - CommitLog.instance.add(rm); - throw new AssertionError("mutation larger than limit was accepted"); - } - catch (IllegalArgumentException e) - { - // IAE is thrown on too-large mutations - } - } - - protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception - { - Checksum checksum = new CRC32(); - checksum.update(size); - testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue()); - } - - protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception - { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - DataOutputStream dout = new DataOutputStream(out); - dout.writeInt(size); - dout.writeLong(checksum); - dout.write(new byte[dataSize]); - dout.close(); - testRecovery(out.toByteArray()); - } - - protected File tmpFile() throws IOException - { - File logFile = File.createTempFile("CommitLog-" + CommitLogDescriptor.current_version + "-", ".log"); - logFile.deleteOnExit(); - assert logFile.length() == 0; - return logFile; - } - - protected void testRecovery(byte[] logData) throws Exception - { - File logFile = tmpFile(); - try (OutputStream lout = new FileOutputStream(logFile)) - { - lout.write(logData); - //statics make it annoying to test things correctly - CommitLog.instance.recover(new File[]{ logFile }); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ - } - } - - @Test - public void testVersions() - { - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); - Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); - Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); - - Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); - - Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion()); - String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; - Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); - } - - @Test - public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException - { - CommitLog.instance.resetUnsafe(true); - boolean prev = DatabaseDescriptor.isAutoSnapshot(); - DatabaseDescriptor.setAutoSnapshot(false); - ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); - ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"); - - final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k")); - rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0); - rm1.apply(); - cfs1.truncateBlocking(); - DatabaseDescriptor.setAutoSnapshot(prev); - final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k")); - rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0); - - for (int i = 0 ; i < 5 ; i++) - CommitLog.instance.add(rm2); - - Assert.assertEquals(2, CommitLog.instance.activeSegments()); - ReplayPosition position = CommitLog.instance.getContext(); - for (Keyspace ks : Keyspace.system()) - for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) - CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position); - CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position); - Assert.assertEquals(1, CommitLog.instance.activeSegments()); - } - - @Test - public void testTruncateWithoutSnapshotNonDurable() throws IOException - { - CommitLog.instance.resetUnsafe(true); - boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot(); - DatabaseDescriptor.setAutoSnapshot(false); - Keyspace notDurableKs = Keyspace.open(KEYSPACE2); - Assert.assertFalse(notDurableKs.getMetadata().durableWrites); - ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); - CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator(); - Mutation rm; - DecoratedKey dk = Util.dk("key1"); - - // add data - rm = new Mutation(KEYSPACE2, dk.getKey()); - rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0); - rm.apply(); - - ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type))); - Row row = command.getRow(notDurableKs); - Cell col = row.cf.getColumn(Util.cellname("Column1")); - Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes())); - cfs.truncateBlocking(); - DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot); - row = command.getRow(notDurableKs); - Assert.assertEquals(null, row.cf); - } - - private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException - { - ByteBuffer buf = ByteBuffer.allocate(1024); - CommitLogDescriptor.writeHeader(buf, desc); - long length = buf.position(); - // Put some extra data in the stream. - buf.putDouble(0.1); - buf.flip(); - FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0); - CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); - Assert.assertEquals("Descriptor length", length, input.getFilePointer()); - Assert.assertEquals("Descriptors", desc, read); - } - - @Test - public void testDescriptorPersistence() throws IOException - { - testDescriptorPersistence(new CommitLogDescriptor(11, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null)); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null))); - testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19, - new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); - } - - @Test - public void testDescriptorInvalidParametersSize() throws IOException - { - Map params = new HashMap<>(); - for (int i=0; i<65535; ++i) - params.put("key"+i, Integer.toString(i, 16)); - try { - CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, - 21, - new ParameterizedClass("LZ4Compressor", params)); - ByteBuffer buf = ByteBuffer.allocate(1024000); - CommitLogDescriptor.writeHeader(buf, desc); - Assert.fail("Parameter object too long should fail on writing descriptor."); - } catch (ConfigurationException e) - { - // correct path - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java new file mode 100644 index 0000000..bde8ca3 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogFailurePolicyTest.java @@ -0,0 +1,140 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.cassandra.db.commitlog; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.service.CassandraDaemon; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.KillerForTests; + +public class CommitLogFailurePolicyTest +{ + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + } + + @Test + public void testCommitFailurePolicy_stop() throws ConfigurationException + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + // Need storage service active so stop policy can shutdown gossip + StorageService.instance.initServer(); + Assert.assertTrue(Gossiper.instance.isEnabled()); + + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.stop); + CommitLog.handleCommitError("Test stop error", new Throwable()); + Assert.assertFalse(Gossiper.instance.isEnabled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + } + } + + @Test + public void testCommitFailurePolicy_die() + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.die); + CommitLog.handleCommitError("Testing die policy", new Throwable()); + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertFalse(killerForTests.wasKilledQuietly()); //only killed quietly on startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitFailurePolicy_ignore_beforeStartup() + { + //startup was not completed successfuly (since method completeSetup() was not called) + CassandraDaemon daemon = new CassandraDaemon(); + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + CommitLog.handleCommitError("Testing ignore policy", new Throwable()); + //even though policy is ignore, JVM must die because Daemon has not finished initializing + Assert.assertTrue(killerForTests.wasKilled()); + Assert.assertTrue(killerForTests.wasKilledQuietly()); //killed quietly due to startup failure + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } + + @Test + public void testCommitFailurePolicy_ignore_afterStartup() throws Exception + { + CassandraDaemon daemon = new CassandraDaemon(); + daemon.completeSetup(); //startup must be completed, otherwise commit log failure must kill JVM regardless of failure policy + StorageService.instance.registerDaemon(daemon); + + KillerForTests killerForTests = new KillerForTests(); + JVMStabilityInspector.Killer originalKiller = JVMStabilityInspector.replaceKiller(killerForTests); + Config.CommitFailurePolicy oldPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try + { + DatabaseDescriptor.setCommitFailurePolicy(Config.CommitFailurePolicy.ignore); + CommitLog.handleCommitError("Testing ignore policy", new Throwable()); + //error policy is set to IGNORE, so JVM must not be killed if error ocurs after startup + Assert.assertFalse(killerForTests.wasKilled()); + } + finally + { + DatabaseDescriptor.setCommitFailurePolicy(oldPolicy); + JVMStabilityInspector.replaceKiller(originalKiller); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java new file mode 100644 index 0000000..da8058c --- /dev/null +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -0,0 +1,535 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.cassandra.db.commitlog; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.zip.CRC32; +import java.util.zip.Checksum; + +import com.google.common.collect.ImmutableMap; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.Config.CommitFailurePolicy; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.config.ParameterizedClass; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.SliceByNamesReadCommand; +import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.CellNameType; +import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.ByteBufferDataInput; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +public class CommitLogTest +{ + private static final String KEYSPACE1 = "CommitLogTest"; + private static final String KEYSPACE2 = "CommitLogTestNonDurable"; + private static final String CF1 = "Standard1"; + private static final String CF2 = "Standard2"; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF1), + SchemaLoader.standardCFMD(KEYSPACE1, CF2)); + SchemaLoader.createKeyspace(KEYSPACE2, + false, + true, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF1), + SchemaLoader.standardCFMD(KEYSPACE1, CF2)); + CompactionManager.instance.disableAutoCompaction(); + } + + @Test(expected = CommitLogReplayException.class) + public void testRecoveryWithEmptyLog() throws Exception + { + CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) }); + } + + @Test + public void testRecoveryWithEmptyLog20() throws Exception + { + CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) }); + } + + @Test + public void testRecoveryWithZeroLog() throws Exception + { + testRecovery(new byte[10], null); + } + + @Test + public void testRecoveryWithShortLog() throws Exception + { + // force EOF while reading log + testRecoveryWithBadSizeArgument(100, 10); + } + + @Test(expected = CommitLogReplayException.class) + public void testRecoveryWithShortSize() throws Exception + { + testRecovery(new byte[2], CommitLogDescriptor.VERSION_20); + } + + @Test + public void testRecoveryWithShortCheckSum() throws Exception + { + byte[] data = new byte[8]; + data[3] = 10; // make sure this is not a legacy end marker. + testRecovery(data, CommitLogReplayException.class); + } + + @Test + public void testRecoveryWithShortMutationSize() throws Exception + { + testRecoveryWithBadSizeArgument(9, 10); + } + + private void testRecoveryWithGarbageLog() throws Exception + { + byte[] garbage = new byte[100]; + (new java.util.Random()).nextBytes(garbage); + testRecovery(garbage, CommitLogDescriptor.current_version); + } + + @Test(expected = CommitLogReplayException.class) + public void testRecoveryWithGarbageLog_fail() throws Exception + { + testRecoveryWithGarbageLog(); + } + + @Test + public void testRecoveryWithGarbageLog_ignoredByProperty() throws Exception + { + try { + System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); + testRecoveryWithGarbageLog(); + } finally { + System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); + } + } + + @Test + public void testRecoveryWithGarbageLog_ignoredByPolicy() throws Exception + { + CommitFailurePolicy existingPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try { + DatabaseDescriptor.setCommitFailurePolicy(CommitFailurePolicy.ignore); + testRecoveryWithGarbageLog(); + } finally { + DatabaseDescriptor.setCommitFailurePolicy(existingPolicy); + } + } + + @Test + public void testRecoveryWithBadSizeChecksum() throws Exception + { + Checksum checksum = new CRC32(); + checksum.update(100); + testRecoveryWithBadSizeArgument(100, 100, ~checksum.getValue()); + } + + @Test + public void testRecoveryWithNegativeSizeArgument() throws Exception + { + // garbage from a partial/bad flush could be read as a negative size even if there is no EOF + testRecoveryWithBadSizeArgument(-10, 10); // negative size, but no EOF + } + + @Test + public void testDontDeleteIfDirty() throws Exception + { + CommitLog.instance.resetUnsafe(true); + // Roughly 32 MB mutation + Mutation rm = new Mutation(KEYSPACE1, bytes("k")); + rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize()/4), 0); + + // Adding it 5 times + CommitLog.instance.add(rm); + CommitLog.instance.add(rm); + CommitLog.instance.add(rm); + CommitLog.instance.add(rm); + CommitLog.instance.add(rm); + + // Adding new mutation on another CF + Mutation rm2 = new Mutation(KEYSPACE1, bytes("k")); + rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate(4), 0); + CommitLog.instance.add(rm2); + + assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + + UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); + CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); + + // Assert we still have both our segment + assert CommitLog.instance.activeSegments() == 2 : "Expecting 2 segments, got " + CommitLog.instance.activeSegments(); + } + + @Test + public void testDeleteIfNotDirty() throws Exception + { + DatabaseDescriptor.getCommitLogSegmentSize(); + CommitLog.instance.resetUnsafe(true); + // Roughly 32 MB mutation + Mutation rm = new Mutation(KEYSPACE1, bytes("k")); + rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/4) - 1), 0); + + // Adding it twice (won't change segment) + CommitLog.instance.add(rm); + CommitLog.instance.add(rm); + + assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + + // "Flush": this won't delete anything + UUID cfid1 = rm.getColumnFamilyIds().iterator().next(); + CommitLog.instance.sync(true); + CommitLog.instance.discardCompletedSegments(cfid1, CommitLog.instance.getContext()); + + assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + + // Adding new mutation on another CF, large enough (including CL entry overhead) that a new segment is created + Mutation rm2 = new Mutation(KEYSPACE1, bytes("k")); + rm2.add(CF2, Util.cellname("c1"), ByteBuffer.allocate((DatabaseDescriptor.getCommitLogSegmentSize()/2) - 200), 0); + CommitLog.instance.add(rm2); + // also forces a new segment, since each entry-with-overhead is just under half the CL size + CommitLog.instance.add(rm2); + CommitLog.instance.add(rm2); + + assert CommitLog.instance.activeSegments() == 3 : "Expecting 3 segments, got " + CommitLog.instance.activeSegments(); + + + // "Flush" second cf: The first segment should be deleted since we + // didn't write anything on cf1 since last flush (and we flush cf2) + + UUID cfid2 = rm2.getColumnFamilyIds().iterator().next(); + CommitLog.instance.discardCompletedSegments(cfid2, CommitLog.instance.getContext()); + + // Assert we still have both our segment + assert CommitLog.instance.activeSegments() == 1 : "Expecting 1 segment, got " + CommitLog.instance.activeSegments(); + } + + private static int getMaxRecordDataSize(String keyspace, ByteBuffer key, String table, CellName column) + { + Mutation rm = new Mutation(KEYSPACE1, bytes("k")); + rm.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(0), 0); + + int max = (DatabaseDescriptor.getCommitLogSegmentSize() / 2); + max -= CommitLogSegment.ENTRY_OVERHEAD_SIZE; // log entry overhead + return max - (int) Mutation.serializer.serializedSize(rm, MessagingService.current_version); + } + + private static int getMaxRecordDataSize() + { + return getMaxRecordDataSize(KEYSPACE1, bytes("k"), CF1, Util.cellname("c1")); + } + + // CASSANDRA-3615 + @Test + public void testEqualRecordLimit() throws Exception + { + CommitLog.instance.resetUnsafe(true); + + Mutation rm = new Mutation(KEYSPACE1, bytes("k")); + rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(getMaxRecordDataSize()), 0); + CommitLog.instance.add(rm); + } + + @Test + public void testExceedRecordLimit() throws Exception + { + CommitLog.instance.resetUnsafe(true); + try + { + Mutation rm = new Mutation(KEYSPACE1, bytes("k")); + rm.add(CF1, Util.cellname("c1"), ByteBuffer.allocate(1 + getMaxRecordDataSize()), 0); + CommitLog.instance.add(rm); + throw new AssertionError("mutation larger than limit was accepted"); + } + catch (IllegalArgumentException e) + { + // IAE is thrown on too-large mutations + } + } + + protected void testRecoveryWithBadSizeArgument(int size, int dataSize) throws Exception + { + Checksum checksum = new CRC32(); + checksum.update(size); + testRecoveryWithBadSizeArgument(size, dataSize, checksum.getValue()); + } + + protected void testRecoveryWithBadSizeArgument(int size, int dataSize, long checksum) throws Exception + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dout = new DataOutputStream(out); + dout.writeInt(size); + dout.writeLong(checksum); + dout.write(new byte[dataSize]); + dout.close(); + testRecovery(out.toByteArray(), CommitLogReplayException.class); + } + + protected File tmpFile(int version) throws IOException + { + File logFile = File.createTempFile("CommitLog-" + version + "-", ".log"); + logFile.deleteOnExit(); + assert logFile.length() == 0; + return logFile; + } + + protected Void testRecovery(byte[] logData, int version) throws Exception + { + File logFile = tmpFile(version); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(logData); + //statics make it annoying to test things correctly + CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + } + return null; + } + + protected Void testRecovery(CommitLogDescriptor desc, byte[] logData) throws Exception + { + File logFile = tmpFile(desc.version); + CommitLogDescriptor fromFile = CommitLogDescriptor.fromFileName(logFile.getName()); + // Change id to match file. + desc = new CommitLogDescriptor(desc.version, fromFile.id, desc.compression); + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(buf.array(), 0, buf.position()); + lout.write(logData); + //statics make it annoying to test things correctly + CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + } + return null; + } + + @Test(expected = CommitLogReplayException.class) + public void testRecoveryWithIdMismatch() throws Exception + { + CommitLogDescriptor desc = new CommitLogDescriptor(4, null); + File logFile = tmpFile(desc.version); + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + try (OutputStream lout = new FileOutputStream(logFile)) + { + lout.write(buf.array(), 0, buf.position()); + //statics make it annoying to test things correctly + CommitLog.instance.recover(logFile.getPath()); //CASSANDRA-1119 / CASSANDRA-1179 throw on failure*/ + } + } + + @Test(expected = CommitLogReplayException.class) + public void testRecoveryWithBadCompressor() throws Exception + { + CommitLogDescriptor desc = new CommitLogDescriptor(4, new ParameterizedClass("UnknownCompressor", null)); + testRecovery(desc, new byte[0]); + } + + protected void runExpecting(Callable r, Class expected) + { + Throwable caught = null; + try + { + r.call(); + } + catch (Throwable t) + { + if (expected != t.getClass()) + throw new AssertionError("Expected exception " + expected + ", got " + t, t); + caught = t; + } + if (expected != null && caught == null) + Assert.fail("Expected exception " + expected + " but call completed successfully."); + } + + protected void testRecovery(final byte[] logData, Class expected) throws Exception + { + runExpecting(new Callable() { + public Void call() throws Exception + { + return testRecovery(logData, CommitLogDescriptor.VERSION_20); + } + }, expected); + runExpecting(new Callable() { + public Void call() throws Exception + { + return testRecovery(new CommitLogDescriptor(4, null), logData); + } + }, expected); + } + + @Test + public void testVersions() + { + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-1340512736956320000.log")); + Assert.assertTrue(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog--2-1340512736956320000.log")); + Assert.assertFalse(CommitLogDescriptor.isValid("CommitLog-2-1340512736956320000-123.log")); + + Assert.assertEquals(1340512736956320000L, CommitLogDescriptor.fromFileName("CommitLog-2-1340512736956320000.log").id); + + Assert.assertEquals(MessagingService.current_version, new CommitLogDescriptor(1340512736956320000L, null).getMessagingVersion()); + String newCLName = "CommitLog-" + CommitLogDescriptor.current_version + "-1340512736956320000.log"; + Assert.assertEquals(MessagingService.current_version, CommitLogDescriptor.fromFileName(newCLName).getMessagingVersion()); + } + + @Test + public void testTruncateWithoutSnapshot() throws ExecutionException, InterruptedException, IOException + { + CommitLog.instance.resetUnsafe(true); + boolean prev = DatabaseDescriptor.isAutoSnapshot(); + DatabaseDescriptor.setAutoSnapshot(false); + ColumnFamilyStore cfs1 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard1"); + ColumnFamilyStore cfs2 = Keyspace.open(KEYSPACE1).getColumnFamilyStore("Standard2"); + + final Mutation rm1 = new Mutation(KEYSPACE1, bytes("k")); + rm1.add("Standard1", Util.cellname("c1"), ByteBuffer.allocate(100), 0); + rm1.apply(); + cfs1.truncateBlocking(); + DatabaseDescriptor.setAutoSnapshot(prev); + final Mutation rm2 = new Mutation(KEYSPACE1, bytes("k")); + rm2.add("Standard2", Util.cellname("c1"), ByteBuffer.allocate(DatabaseDescriptor.getCommitLogSegmentSize() / 4), 0); + + for (int i = 0 ; i < 5 ; i++) + CommitLog.instance.add(rm2); + + Assert.assertEquals(2, CommitLog.instance.activeSegments()); + ReplayPosition position = CommitLog.instance.getContext(); + for (Keyspace ks : Keyspace.system()) + for (ColumnFamilyStore syscfs : ks.getColumnFamilyStores()) + CommitLog.instance.discardCompletedSegments(syscfs.metadata.cfId, position); + CommitLog.instance.discardCompletedSegments(cfs2.metadata.cfId, position); + Assert.assertEquals(1, CommitLog.instance.activeSegments()); + } + + @Test + public void testTruncateWithoutSnapshotNonDurable() throws IOException + { + CommitLog.instance.resetUnsafe(true); + boolean prevAutoSnapshot = DatabaseDescriptor.isAutoSnapshot(); + DatabaseDescriptor.setAutoSnapshot(false); + Keyspace notDurableKs = Keyspace.open(KEYSPACE2); + Assert.assertFalse(notDurableKs.getMetadata().durableWrites); + ColumnFamilyStore cfs = notDurableKs.getColumnFamilyStore("Standard1"); + CellNameType type = notDurableKs.getColumnFamilyStore("Standard1").getComparator(); + Mutation rm; + DecoratedKey dk = Util.dk("key1"); + + // add data + rm = new Mutation(KEYSPACE2, dk.getKey()); + rm.add("Standard1", Util.cellname("Column1"), ByteBufferUtil.bytes("abcd"), 0); + rm.apply(); + + ReadCommand command = new SliceByNamesReadCommand(KEYSPACE2, dk.getKey(), "Standard1", System.currentTimeMillis(), new NamesQueryFilter(FBUtilities.singleton(Util.cellname("Column1"), type))); + Row row = command.getRow(notDurableKs); + Cell col = row.cf.getColumn(Util.cellname("Column1")); + Assert.assertEquals(col.value(), ByteBuffer.wrap("abcd".getBytes())); + cfs.truncateBlocking(); + DatabaseDescriptor.setAutoSnapshot(prevAutoSnapshot); + row = command.getRow(notDurableKs); + Assert.assertEquals(null, row.cf); + } + + private void testDescriptorPersistence(CommitLogDescriptor desc) throws IOException + { + ByteBuffer buf = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buf, desc); + long length = buf.position(); + // Put some extra data in the stream. + buf.putDouble(0.1); + buf.flip(); + FileDataInput input = new ByteBufferDataInput(buf, "input", 0, 0); + CommitLogDescriptor read = CommitLogDescriptor.readHeader(input); + Assert.assertEquals("Descriptor length", length, input.getFilePointer()); + Assert.assertEquals("Descriptors", desc, read); + } + + @Test + public void testDescriptorPersistence() throws IOException + { + testDescriptorPersistence(new CommitLogDescriptor(11, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_21, 13, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 15, null)); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 17, new ParameterizedClass("LZ4Compressor", null))); + testDescriptorPersistence(new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, 19, + new ParameterizedClass("StubbyCompressor", ImmutableMap.of("parameter1", "value1", "flag2", "55", "argument3", "null")))); + } + + @Test + public void testDescriptorInvalidParametersSize() throws IOException + { + Map params = new HashMap<>(); + for (int i=0; i<6000; ++i) + params.put("key"+i, Integer.toString(i, 16)); + try { + CommitLogDescriptor desc = new CommitLogDescriptor(CommitLogDescriptor.VERSION_22, + 21, + new ParameterizedClass("LZ4Compressor", params)); + ByteBuffer buf = ByteBuffer.allocate(1024000); + CommitLogDescriptor.writeHeader(buf, desc); + Assert.fail("Parameter object too long should fail on writing descriptor."); + } catch (ConfigurationException e) + { + // correct path + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fce8478f/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java index 1655078..2031475 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogUpgradeTest.java @@ -36,10 +36,13 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.config.Config.CommitFailurePolicy; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.commitlog.CommitLogReplayer.CommitLogReplayException; public class CommitLogUpgradeTest { @@ -65,6 +68,58 @@ public class CommitLogUpgradeTest testRestore(DATA_DIR + "2.1"); } + @Test + public void test22_truncated() throws Exception + { + testRestore(DATA_DIR + "2.2-lz4-truncated"); + } + + @Test(expected = CommitLogReplayException.class) + public void test22_bitrot() throws Exception + { + testRestore(DATA_DIR + "2.2-lz4-bitrot"); + } + + @Test + public void test22_bitrot_ignored() throws Exception + { + try { + System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); + testRestore(DATA_DIR + "2.2-lz4-bitrot"); + } finally { + System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); + } + } + + @Test + public void test22_bitrot_ignore_policy() throws Exception + { + CommitFailurePolicy existingPolicy = DatabaseDescriptor.getCommitFailurePolicy(); + try { + DatabaseDescriptor.setCommitFailurePolicy(CommitFailurePolicy.ignore); + testRestore(DATA_DIR + "2.2-lz4-bitrot"); + } finally { + DatabaseDescriptor.setCommitFailurePolicy(existingPolicy); + } + } + + @Test(expected = CommitLogReplayException.class) + public void test22_bitrot2() throws Exception + { + testRestore(DATA_DIR + "2.2-lz4-bitrot2"); + } + + @Test + public void test22_bitrot2_ignored() throws Exception + { + try { + System.setProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY, "true"); + testRestore(DATA_DIR + "2.2-lz4-bitrot2"); + } finally { + System.clearProperty(CommitLogReplayer.IGNORE_REPLAY_ERRORS_PROPERTY); + } + } + @BeforeClass static public void initialize() throws FileNotFoundException, IOException, InterruptedException {