cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jji...@apache.org
Subject [5/6] cassandra git commit: Merge branch 'cassandra-3.0' into cassandra-3.11
Date Tue, 05 Sep 2017 18:49:50 GMT
Merge branch 'cassandra-3.0' into cassandra-3.11


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f5f7028
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f5f7028
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f5f7028

Branch: refs/heads/cassandra-3.11
Commit: 3f5f70281ee87c09d312d2169d6335e5b0abf815
Parents: d44a0d2 0493545
Author: Jeff Jirsa <jjirsa@apple.com>
Authored: Tue Sep 5 11:47:18 2017 -0700
Committer: Jeff Jirsa <jjirsa@apple.com>
Committed: Tue Sep 5 11:47:55 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 .../apache/cassandra/db/commitlog/CommitLogReader.java  |  3 ++-
 .../apache/cassandra/db/commitlog/CommitLogTest.java    | 12 +++++++++++-
 3 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f5f7028/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0faed3d,a3eccf2..5a53e50
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,10 +1,19 @@@
 -3.0.15
 +3.11.1
 + * Add a compaction option to TWCS to ignore sstables overlapping checks (CASSANDRA-13418)
 + * BTree.Builder memory leak (CASSANDRA-13754)
 + * Revert CASSANDRA-10368 of supporting non-pk column filtering due to correctness (CASSANDRA-13798)
 + * Fix cassandra-stress hang issues when an error during cluster connection happens (CASSANDRA-12938)
 + * Better bootstrap failure message when blocked by (potential) range movement (CASSANDRA-13744)
 + * "ignore" option is ignored in sstableloader (CASSANDRA-13721)
 + * Deadlock in AbstractCommitLogSegmentManager (CASSANDRA-13652)
 + * Duplicate the buffer before passing it to analyser in SASI operation (CASSANDRA-13512)
 + * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 +Merged from 3.0:
+  * Better handle corrupt final commitlog segment (CASSANDRA-11995)
   * StreamingHistogram is not thread safe (CASSANDRA-13756)
   * Fix MV timestamp issues (CASSANDRA-11500)
 - * Better tolerate improperly formatted bcrypt hashes (CASSANDRA-13626) 
 + * Better tolerate improperly formatted bcrypt hashes (CASSANDRA-13626)
   * Fix race condition in read command serialization (CASSANDRA-13363)
 - * Enable segement creation before recovering commitlogs (CASSANDRA-13587)
   * Fix AssertionError in short read protection (CASSANDRA-13747)
   * Don't skip corrupted sstables on startup (CASSANDRA-13620)
   * Fix the merging of cells with different user type versions (CASSANDRA-13776)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f5f7028/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index d1cb8d6,0000000..8c04329
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -1,514 -1,0 +1,515 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db.commitlog;
 +
 +import java.io.*;
 +import java.util.*;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.zip.CRC32;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.commons.lang3.StringUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.UnknownColumnFamilyException;
 +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
 +import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
 +import org.apache.cassandra.db.partitions.PartitionUpdate;
 +import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.io.util.DataInputBuffer;
 +import org.apache.cassandra.io.util.FileDataInput;
 +import org.apache.cassandra.io.util.RandomAccessReader;
 +import org.apache.cassandra.io.util.RebufferingInputStream;
 +import org.apache.cassandra.utils.JVMStabilityInspector;
 +
 +import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;
 +
 +public class CommitLogReader
 +{
 +    private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);
 +
 +    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;
 +
 +    @VisibleForTesting
 +    public static final int ALL_MUTATIONS = -1;
 +    private final CRC32 checksum;
 +    private final Map<UUID, AtomicInteger> invalidMutations;
 +
 +    private byte[] buffer;
 +
 +    public CommitLogReader()
 +    {
 +        checksum = new CRC32();
 +        invalidMutations = new HashMap<>();
 +        buffer = new byte[4096];
 +    }
 +
 +    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
 +    {
 +        return invalidMutations.entrySet();
 +    }
 +
 +    /**
 +     * Reads all passed in files with no minimum, no start, and no mutation limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException
 +    {
 +        readAllFiles(handler, files, CommitLogPosition.NONE);
 +    }
 +
 +    /**
 +     * Reads all passed in files with minPosition, no start, and no mutation limit.
 +     */
 +    public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition
minPosition) throws IOException
 +    {
 +        for (int i = 0; i < files.length; i++)
 +            readCommitLogSegment(handler, files[i], minPosition, ALL_MUTATIONS, i + 1 ==
files.length);
 +    }
 +
 +    /**
 +     * Reads passed in file fully
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation)
throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads passed in file fully, up to mutationLimit count
 +     */
 +    @VisibleForTesting
 +    public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit,
boolean tolerateTruncation) throws IOException
 +    {
 +        readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation);
 +    }
 +
 +    /**
 +     * Reads mutations from file, handing them off to handler
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param file CommitLogSegment file to read
 +     * @param minPosition Optional minimum CommitLogPosition - all segments with id >
or matching w/greater position will be read
 +     * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS
serves as marker to play all.
 +     * @param tolerateTruncation Whether or not we should allow truncation of this file
or throw if EOF found
 +     *
 +     * @throws IOException
 +     */
 +    public void readCommitLogSegment(CommitLogReadHandler handler,
 +                                     File file,
 +                                     CommitLogPosition minPosition,
 +                                     int mutationLimit,
 +                                     boolean tolerateTruncation) throws IOException
 +    {
 +        // just transform from the file name (no reading of headers) to determine version
 +        CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 +
 +        try(RandomAccessReader reader = RandomAccessReader.open(file))
 +        {
 +            if (desc.version < CommitLogDescriptor.VERSION_21)
 +            {
 +                if (!shouldSkipSegmentId(file, desc, minPosition))
 +                {
 +                    if (minPosition.segmentId == desc.id)
 +                        reader.seek(minPosition.position);
 +                    ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit,
tolerateTruncation);
 +                    statusTracker.errorContext = desc.fileName();
 +                    readSection(handler, reader, minPosition, (int) reader.length(), statusTracker,
desc);
 +                }
 +                return;
 +            }
 +
 +            final long segmentIdFromFilename = desc.id;
 +            try
 +            {
 +                // The following call can either throw or legitimately return null. For
either case, we need to check
 +                // desc outside this block and set it to null in the exception case.
 +                desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
 +            }
 +            catch (Exception e)
 +            {
 +                desc = null;
 +            }
 +            if (desc == null)
 +            {
 +                // don't care about whether or not the handler thinks we can continue. We
can't w/out descriptor.
++                // whether or not we continue with startup will depend on whether this is
the last segment
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Could not read commit log descriptor in file %s", file),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR,
-                     false));
++                    tolerateTruncation));
 +                return;
 +            }
 +
 +            if (segmentIdFromFilename != desc.id)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format(
 +                    "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename,
desc.id, file),
 +                                                                                CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR,
 +                                                                                false)))
 +                {
 +                    return;
 +                }
 +            }
 +
 +            if (shouldSkipSegmentId(file, desc, minPosition))
 +                return;
 +
 +            CommitLogSegmentReader segmentReader;
 +            try
 +            {
 +                segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation);
 +            }
 +            catch(Exception e)
 +            {
 +                handler.handleUnrecoverableError(new CommitLogReadException(
 +                    String.format("Unable to create segment reader for commit log file:
%s", e),
 +                    CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR,
 +                    tolerateTruncation));
 +                return;
 +            }
 +
 +            try
 +            {
 +                ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation);
 +                for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader)
 +                {
 +                    // Only tolerate truncation if we allow in both global and segment
 +                    statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection;
 +
 +                    // Skip segments that are completely behind the desired minPosition
 +                    if (desc.id == minPosition.segmentId && syncSegment.endPosition
< minPosition.position)
 +                        continue;
 +
 +                    statusTracker.errorContext = String.format("Next section at %d in %s",
syncSegment.fileStartPosition, desc.fileName());
 +
 +                    readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition,
statusTracker, desc);
 +                    if (!statusTracker.shouldContinue())
 +                        break;
 +                }
 +            }
 +            // Unfortunately AbstractIterator cannot throw a checked exception, so we check
to see if a RuntimeException
 +            // is wrapping an IOException.
 +            catch (RuntimeException re)
 +            {
 +                if (re.getCause() instanceof IOException)
 +                    throw (IOException) re.getCause();
 +                throw re;
 +            }
 +            logger.debug("Finished reading {}", file);
 +        }
 +    }
 +
 +    /**
 +     * Any segment with id >= minPosition.segmentId is a candidate for read.
 +     */
 +    private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition
minPosition)
 +    {
 +        logger.debug("Reading {} (CL version {}, messaging version {}, compression {})",
 +            file.getPath(),
 +            desc.version,
 +            desc.getMessagingVersion(),
 +            desc.compression);
 +
 +        if (minPosition.segmentId > desc.id)
 +        {
 +            logger.trace("Skipping read of fully-flushed {}", file);
 +            return true;
 +        }
 +        return false;
 +    }
 +
 +    /**
 +     * Reads a section of a file containing mutations
 +     *
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param reader FileDataInput / logical buffer containing commitlog mutations
 +     * @param minPosition CommitLogPosition indicating when we should start actively replaying
mutations
 +     * @param end logical numeric end of the segment being read
 +     * @param statusTracker ReadStatusTracker with current state of mutation count, error
state, etc
 +     * @param desc Descriptor for CommitLog serialization
 +     */
 +    private void readSection(CommitLogReadHandler handler,
 +                             FileDataInput reader,
 +                             CommitLogPosition minPosition,
 +                             int end,
 +                             ReadStatusTracker statusTracker,
 +                             CommitLogDescriptor desc) throws IOException
 +    {
 +        // seek rather than deserializing mutation-by-mutation to reach the desired minPosition
in this SyncSegment
 +        if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position)
 +            reader.seek(minPosition.position);
 +
 +        while (statusTracker.shouldContinue() && reader.getFilePointer() < end
&& !reader.isEOF())
 +        {
 +            long mutationStart = reader.getFilePointer();
 +            if (logger.isTraceEnabled())
 +                logger.trace("Reading mutation at {}", mutationStart);
 +
 +            long claimedCRC32;
 +            int serializedSize;
 +            try
 +            {
 +                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER)
to identify the end
 +                // of a segment, which happens naturally due to the 0 padding of the empty
segment on creation.
 +                // However, it's possible with 2.1 era commitlogs that the last mutation
ended less than 4 bytes
 +                // from the end of the file, which means that we'll be unable to read an
a full int and instead
 +                // read an EOF here
 +                if(end - reader.getFilePointer() < 4)
 +                {
 +                    logger.trace("Not enough bytes left for another mutation in this CommitLog
segment, continuing");
 +                    statusTracker.requestTermination();
 +                    return;
 +                }
 +
 +                // any of the reads may hit EOF
 +                serializedSize = reader.readInt();
 +                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
 +                {
 +                    logger.trace("Encountered end of segment marker at {}", reader.getFilePointer());
 +                    statusTracker.requestTermination();
 +                    return;
 +                }
 +
 +                // Mutation must be at LEAST 10 bytes:
 +                //    3 for a non-empty Keyspace
 +                //    3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
 +                //    4 bytes for column count.
 +                // This prevents CRC by being fooled by special-case garbage in the file;
see CASSANDRA-2128
 +                if (serializedSize < 10)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                    String.format("Invalid mutation size
%d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
 +                                                    CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader,
desc.version);
 +                checksum.reset();
 +                CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version);
 +
 +                if (checksum.getValue() != claimedSizeChecksum)
 +                {
 +                    if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                    String.format("Mutation size checksum
failure at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                    CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                    statusTracker.tolerateErrorsInSection)))
 +                    {
 +                        statusTracker.requestTermination();
 +                    }
 +                    return;
 +                }
 +
 +                if (serializedSize > buffer.length)
 +                    buffer = new byte[(int) (1.2 * serializedSize)];
 +                reader.readFully(buffer, 0, serializedSize);
 +
 +                claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version);
 +            }
 +            catch (EOFException eof)
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                String.format("Unexpected end of segment
at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.EOF,
 +                                                statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                return;
 +            }
 +
 +            checksum.update(buffer, 0, serializedSize);
 +            if (claimedCRC32 != checksum.getValue())
 +            {
 +                if (handler.shouldSkipSegmentOnError(new CommitLogReadException(
 +                                                String.format("Mutation checksum failure
at %d in %s", mutationStart, statusTracker.errorContext),
 +                                                CommitLogReadErrorReason.MUTATION_ERROR,
 +                                                statusTracker.tolerateErrorsInSection)))
 +                {
 +                    statusTracker.requestTermination();
 +                }
 +                continue;
 +            }
 +
 +            long mutationPosition = reader.getFilePointer();
 +            readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition,
desc);
 +
 +            // Only count this as a processed mutation if it is after our min as we suppress
reading of mutations that
 +            // are before this mark.
 +            if (mutationPosition >= minPosition.position)
 +                statusTracker.addProcessedMutation();
 +        }
 +    }
 +
 +    /**
 +     * Deserializes and passes a Mutation to the ICommitLogReadHandler requested
 +     *
 +     * @param handler Handler that will take action based on deserialized Mutations
 +     * @param inputBuffer raw byte array w/Mutation data
 +     * @param size deserialized size of mutation
 +     * @param minPosition We need to suppress replay of mutations that are before the required
minPosition
 +     * @param entryLocation filePointer offset of mutation within CommitLogSegment
 +     * @param desc CommitLogDescriptor being worked on
 +     */
 +    @VisibleForTesting
 +    protected void readMutation(CommitLogReadHandler handler,
 +                                byte[] inputBuffer,
 +                                int size,
 +                                CommitLogPosition minPosition,
 +                                final int entryLocation,
 +                                final CommitLogDescriptor desc) throws IOException
 +    {
 +        // For now, we need to go through the motions of deserializing the mutation to determine
its size and move
 +        // the file pointer forward accordingly, even if we're behind the requested minPosition
within this SyncSegment.
 +        boolean shouldReplay = entryLocation > minPosition.position;
 +
 +        final Mutation mutation;
 +        try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size))
 +        {
 +            mutation = Mutation.serializer.deserialize(bufIn,
 +                                                       desc.getMessagingVersion(),
 +                                                       SerializationHelper.Flag.LOCAL);
 +            // doublecheck that what we read is still] valid for the current schema
 +            for (PartitionUpdate upd : mutation.getPartitionUpdates())
 +                upd.validate();
 +        }
 +        catch (UnknownColumnFamilyException ex)
 +        {
 +            if (ex.cfId == null)
 +                return;
 +            AtomicInteger i = invalidMutations.get(ex.cfId);
 +            if (i == null)
 +            {
 +                i = new AtomicInteger(1);
 +                invalidMutations.put(ex.cfId, i);
 +            }
 +            else
 +                i.incrementAndGet();
 +            return;
 +        }
 +        catch (Throwable t)
 +        {
 +            JVMStabilityInspector.inspectThrowable(t);
 +            File f = File.createTempFile("mutation", "dat");
 +
 +            try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f)))
 +            {
 +                out.write(inputBuffer, 0, size);
 +            }
 +
 +            // Checksum passed so this error can't be permissible.
 +            handler.handleUnrecoverableError(new CommitLogReadException(
 +                String.format(
 +                    "Unexpected error deserializing mutation; saved to %s.  " +
 +                    "This may be caused by replaying a mutation against a table with the
same name but incompatible schema.  " +
 +                    "Exception follows: %s", f.getAbsolutePath(), t),
 +                CommitLogReadErrorReason.MUTATION_ERROR,
 +                false));
 +            return;
 +        }
 +
 +        if (logger.isTraceEnabled())
 +            logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(),
 +                         "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(),
", ") + "}");
 +
 +        if (shouldReplay)
 +            handler.handleMutation(mutation, size, entryLocation, desc);
 +    }
 +
 +    /**
 +     * Helper methods to deal with changing formats of internals of the CommitLog without
polluting deserialization code.
 +     */
 +    private static class CommitLogFormat
 +    {
 +        public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion)
throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +
 +        public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion)
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                    checksum.update(serializedSize);
 +                    break;
 +                // Changed format in 2.0
 +                default:
 +                    updateChecksumInt(checksum, serializedSize);
 +                    break;
 +            }
 +        }
 +
 +        public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion)
throws IOException
 +        {
 +            switch (commitLogVersion)
 +            {
 +                case CommitLogDescriptor.VERSION_12:
 +                case CommitLogDescriptor.VERSION_20:
 +                    return input.readLong();
 +                // Changed format in 2.1
 +                default:
 +                    return input.readInt() & 0xffffffffL;
 +            }
 +        }
 +    }
 +
 +    private static class ReadStatusTracker
 +    {
 +        private int mutationsLeft;
 +        public String errorContext = "";
 +        public boolean tolerateErrorsInSection;
 +        private boolean error;
 +
 +        public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection)
 +        {
 +            this.mutationsLeft = mutationLimit;
 +            this.tolerateErrorsInSection = tolerateErrorsInSection;
 +        }
 +
 +        public void addProcessedMutation()
 +        {
 +            if (mutationsLeft == ALL_MUTATIONS)
 +                return;
 +            --mutationsLeft;
 +        }
 +
 +        public boolean shouldContinue()
 +        {
 +            return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS);
 +        }
 +
 +        public void requestTermination()
 +        {
 +            error = true;
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f5f7028/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 4000fbf,1543415..aab55a5
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -145,16 -117,27 +145,26 @@@ public class CommitLogTes
      @Test
      public void testRecoveryWithEmptyLog() throws Exception
      {
 -        // The first empty file we expect to throw as it's invalid
 -        // We need to pass the second as well, because allowTruncation will be set to true
for the final segment
          runExpecting(() -> {
-             CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.current_version));
 -            CommitLog.instance.recover(new File[]{
 -                    tmpFile(CommitLogDescriptor.current_version),
 -                    tmpFile(CommitLogDescriptor.current_version)  });
++            CommitLog.instance.recoverFiles(new File[]{
++            tmpFile(CommitLogDescriptor.current_version),
++            tmpFile(CommitLogDescriptor.current_version)
++            });
              return null;
          }, CommitLogReplayException.class);
      }
  
      @Test
 -    public void testRecoveryWithEmptyFinalLog() throws Exception
++    public void testRecoveryWithFinalEmptyLog() throws Exception
+     {
+         // Even though it's empty, it's the last commitlog segment, so allowTruncation=true
should allow it to pass
 -        CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version)
 });
++        CommitLog.instance.recoverFiles(new File[]{tmpFile(CommitLogDescriptor.current_version)});
+     }
+ 
+     @Test
      public void testRecoveryWithEmptyLog20() throws Exception
      {
 -        CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.VERSION_20) });
 +        CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.VERSION_20));
      }
  
      @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message