cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggles...@apache.org
Subject [6/6] cassandra git commit: Merge branch 'cassandra-3.11' into trunk
Date Fri, 29 Sep 2017 22:38:59 GMT
Merge branch 'cassandra-3.11' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/77abf868
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/77abf868
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/77abf868

Branch: refs/heads/trunk
Commit: 77abf868a4f60f6978c8d3e334c1a2275c4c37a3
Parents: ebefc96 be21174
Author: Blake Eggleston <bdeggleston@gmail.com>
Authored: Fri Sep 29 15:33:44 2017 -0700
Committer: Blake Eggleston <bdeggleston@gmail.com>
Committed: Fri Sep 29 15:34:44 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/commitlog/CommitLogReader.java | 48 +++++++++++++++++-
 .../db/commitlog/CommitLogReplayer.java         |  7 ++-
 .../db/commitlog/CommitLogSegment.java          |  2 +-
 .../db/commitlog/CompressedSegment.java         |  2 +-
 .../db/commitlog/EncryptedSegment.java          |  4 +-
 .../db/commitlog/MemoryMappedSegment.java       |  2 +-
 .../cassandra/db/commitlog/CommitLogTest.java   | 52 ++++++++++++++++++++
 8 files changed, 109 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 99b5a59,a782333..1495c5d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -255,6 -115,6 +255,7 @@@ Merged from 2.1
   * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
   * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835)
  Merged from 3.0:
++ * Filter header only commit logs before recovery (CASSANDRA-13918)
   * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172)
   * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter
(CASSANDRA-13004)
   * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index 864325b,4d74557..75ef8e9
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@@ -35,7 -34,8 +35,8 @@@ import org.apache.cassandra.db.commitlo
  import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
  import org.apache.cassandra.db.partitions.PartitionUpdate;
  import org.apache.cassandra.db.rows.SerializationHelper;
 +import org.apache.cassandra.exceptions.UnknownTableException;
+ import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.util.ChannelProxy;
  import org.apache.cassandra.io.util.DataInputBuffer;
  import org.apache.cassandra.io.util.FileDataInput;
  import org.apache.cassandra.io.util.RandomAccessReader;
@@@ -78,6 -77,48 +79,44 @@@ public class CommitLogReade
          readAllFiles(handler, files, CommitLogPosition.NONE);
      }
  
+     private static boolean shouldSkip(File file) throws IOException, ConfigurationException
+     {
+         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
 -        if (desc.version < CommitLogDescriptor.VERSION_21)
 -        {
 -            return false;
 -        }
+         try(RandomAccessReader reader = RandomAccessReader.open(file))
+         {
+             CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
+             int end = reader.readInt();
+             long filecrc = reader.readInt() & 0xffffffffL;
+             return end == 0 && filecrc == 0;
+         }
+     }
+ 
 -    private static List<File> filterCommitLogFiles(File[] toFilter)
++    static List<File> filterCommitLogFiles(File[] toFilter)
+     {
+         List<File> filtered = new ArrayList<>(toFilter.length);
+         for (File file: toFilter)
+         {
+             try
+             {
+                 if (shouldSkip(file))
+                 {
+                     logger.info("Skipping playback of empty log: {}", file.getName());
+                 }
+                 else
+                 {
+                     filtered.add(file);
+                 }
+             }
+             catch (Exception e)
+             {
+                 // let recover deal with it
+                 filtered.add(file);
+             }
+         }
+ 
+         return filtered;
+     }
+ 
      /**
       * Reads all passed in files with minPosition, no start, and no mutation limit.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index d1e63e6,ea62fd8..7cb277e
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -144,44 -134,9 +144,47 @@@ public class CommitLogReplayer implemen
  
      public void replayFiles(File[] clogs) throws IOException
      {
-         for (int i = 0; i < clogs.length; i++)
 -        commitLogReader.readAllFiles(this, clogs, globalPosition);
++        List<File> filteredLogs = CommitLogReader.filterCommitLogFiles(clogs);
++        int i = 0;
++        for (File file: filteredLogs)
 +        {
++            i++;
 +            sawCDCMutation = false;
-             commitLogReader.readCommitLogSegment(this, clogs[i], globalPosition, i == clogs.length
- 1);
++            commitLogReader.readCommitLogSegment(this, file, globalPosition, i == filteredLogs.size());
 +            if (sawCDCMutation)
 +                handleCDCReplayCompletion(clogs[i]);
 +        }
      }
  
 +
 +    /**
 +     * Upon replay completion, CDC needs to hard-link files in the CDC folder and calculate
index files so consumers can
 +     * begin their work.
 +     */
 +    private void handleCDCReplayCompletion(File f) throws IOException
 +    {
 +        // Can only reach this point if CDC is enabled, thus we have a CDCSegmentManager
 +        ((CommitLogSegmentManagerCDC)CommitLog.instance.segmentManager).addCDCSize(f.length());
 +
 +        File dest = new File(DatabaseDescriptor.getCDCLogLocation(), f.getName());
 +
 +        // If hard link already exists, assume it's from a previous node run. If people
are mucking around in the cdc_raw
 +        // directory that's on them.
 +        if (!dest.exists())
 +            FileUtils.createHardLink(f, dest);
 +
 +        // The reader has already verified we can deserialize the descriptor.
 +        CommitLogDescriptor desc;
 +        try(RandomAccessReader reader = RandomAccessReader.open(f))
 +        {
 +            desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext());
 +            assert desc != null;
 +            assert f.length() < Integer.MAX_VALUE;
 +            CommitLogSegment.writeCDCIndexFile(desc, (int)f.length(), true);
 +        }
 +    }
 +
 +
      /**
       * Flushes all keyspaces associated with this replayer in parallel, blocking until their
flushes are complete.
       * @return the number of mutations replayed

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/77abf868/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
index 956a05d,267813e..1060a72
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java
@@@ -156,16 -156,74 +157,67 @@@ public class CommitLogTes
      }
  
      @Test
 -    public void testRecoveryWithFinalEmptyLog() throws Exception
 +    public void testRecoveryWithEmptyFinalLog() throws Exception
      {
 -        // Even though it's empty, it's the last commitlog segment, so allowTruncation=true
should allow it to pass
 -        CommitLog.instance.recoverFiles(new File[]{tmpFile(CommitLogDescriptor.current_version)});
 +        CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.current_version));
      }
  
+     /**
+      * Since commit log segments can be allocated before they're needed, the commit log
file with the highest
+      * id isn't neccesarily the last log that we wrote to. We should remove header only
logs on recover so we
+      * can tolerate truncated logs
+      */
+     @Test
+     public void testHeaderOnlyFileFiltering() throws Exception
+     {
+         File directory = Files.createTempDir();
+ 
+         CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version,
1, null, DatabaseDescriptor.getEncryptionContext());
+         CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version,
2, null, DatabaseDescriptor.getEncryptionContext());
+ 
+         ByteBuffer buffer;
+ 
+         // this has a header and malformed data
+         File file1 = new File(directory, desc1.fileName());
+         buffer = ByteBuffer.allocate(1024);
+         CommitLogDescriptor.writeHeader(buffer, desc1);
+         int pos = buffer.position();
+         CommitLogSegment.writeSyncMarker(desc1.id, buffer, buffer.position(), buffer.position(),
buffer.position() + 128);
+         buffer.position(pos + 8);
+         buffer.putInt(5);
+         buffer.putInt(6);
+ 
+         try (OutputStream lout = new FileOutputStream(file1))
+         {
+             lout.write(buffer.array());
+         }
+ 
+         // this has only a header
+         File file2 = new File(directory, desc2.fileName());
+         buffer = ByteBuffer.allocate(1024);
+         CommitLogDescriptor.writeHeader(buffer, desc2);
+         try (OutputStream lout = new FileOutputStream(file2))
+         {
+             lout.write(buffer.array());
+         }
+ 
+         // one corrupt file and one header only file should be ok
+         runExpecting(() -> {
+             CommitLog.instance.recoverFiles(file1, file2);
+             return null;
+         }, null);
+ 
+         // 2 corrupt files and one header only file should fail
+         runExpecting(() -> {
+             CommitLog.instance.recoverFiles(file1, file1, file2);
+             return null;
+         }, CommitLogReplayException.class);
+     }
  
      @Test
 -    public void testRecoveryWithEmptyLog20() throws Exception
 -    {
 -        CommitLog.instance.recoverFiles(tmpFile(CommitLogDescriptor.VERSION_20));
 -    }
 -
 -    @Test
      public void testRecoveryWithZeroLog() throws Exception
      {
 -        testRecovery(new byte[10], null);
 +        testRecovery(new byte[10], CommitLogReplayException.class);
      }
  
      @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message