cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tylerho...@apache.org
Subject cassandra git commit: Fixes to index summary resampling on old sstable formats
Date Thu, 26 Mar 2015 22:37:14 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 93156d761 -> 7ff25f0df


Fixes to index summary resampling on old sstable formats

Patch by Tyler Hobbs; reviewed by Benedict Elliot Smith for
CASSANDRA-8993


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7ff25f0d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7ff25f0d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7ff25f0d

Branch: refs/heads/cassandra-2.1
Commit: 7ff25f0df55bf492e741730473b94bcba8ac6c0b
Parents: 93156d7
Author: Tyler Hobbs <tyler@datastax.com>
Authored: Thu Mar 26 17:36:19 2015 -0500
Committer: Tyler Hobbs <tyler@datastax.com>
Committed: Thu Mar 26 17:36:19 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../cassandra/io/sstable/Downsampling.java      | 22 ++---
 .../cassandra/io/sstable/IndexSummary.java      |  1 +
 .../io/sstable/IndexSummaryManager.java         | 15 ++-
 .../cassandra/io/sstable/SSTableReader.java     | 96 +++++++++++++++++---
 .../cassandra/io/sstable/IndexSummaryTest.java  |  2 +-
 6 files changed, 108 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9bc314d..dba397c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,7 @@
 2.1.4
+ * Avoid overwriting index summaries for sstables with an older format that
+   does not support downsampling; rebuild summaries on startup when this
+   is detected (CASSANDRA-8993)
  * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949)
  * Make PasswordAuthenticator number of hashing rounds configurable (CASSANDRA-8085)
  * Fix AssertionError when binding nested collections in DELETE (CASSANDRA-8900)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/Downsampling.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/Downsampling.java b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
index 6842b25..8455d0b 100644
--- a/src/java/org/apache/cassandra/io/sstable/Downsampling.java
+++ b/src/java/org/apache/cassandra/io/sstable/Downsampling.java
@@ -79,8 +79,8 @@ public class Downsampling
      * Returns a list that can be used to translate current index summary indexes to their
original index before
      * downsampling.  (This repeats every `samplingLevel`, so that's how many entries we
return.)
      *
-     * For example, if [7, 15] is returned, the current index summary entry at index 0 was
originally
-     * at index 7, and the current index 1 was originally at index 15.
+     * For example, if [0, 64] is returned, the current index summary entry at index 0 was
originally
+     * at index 0, and the current index 1 was originally at index 64.
      *
      * @param samplingLevel the current sampling level for the index summary
      *
@@ -115,21 +115,11 @@ public class Downsampling
      */
     public static int getEffectiveIndexIntervalAfterIndex(int index, int samplingLevel, int
minIndexInterval)
     {
-        assert index >= -1;
-        List<Integer> originalIndexes = getOriginalIndexes(samplingLevel);
-        if (index == -1)
-            return originalIndexes.get(0) * minIndexInterval;
-
+        assert index >= 0;
         index %= samplingLevel;
-        if (index == originalIndexes.size() - 1)
-        {
-            // account for partitions after the "last" entry as well as partitions before
the "first" entry
-            return ((BASE_SAMPLING_LEVEL - originalIndexes.get(index)) + originalIndexes.get(0))
* minIndexInterval;
-        }
-        else
-        {
-            return (originalIndexes.get(index + 1) - originalIndexes.get(index)) * minIndexInterval;
-        }
+        List<Integer> originalIndexes = getOriginalIndexes(samplingLevel);
+        int nextEntryOriginalIndex = (index == originalIndexes.size() - 1) ? BASE_SAMPLING_LEVEL
: originalIndexes.get(index + 1);
+        return (nextEntryOriginalIndex - originalIndexes.get(index)) * minIndexInterval;
     }
 
     public static int[] getStartPoints(int currentSamplingLevel, int newSamplingLevel)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index bad50b4..0ea0b48 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -86,6 +86,7 @@ public class IndexSummary extends WrappedSharedCloseable
         this.offsets = offsets;
         this.entries = entries;
         this.samplingLevel = samplingLevel;
+        assert samplingLevel > 0;
     }
 
     private IndexSummary(IndexSummary copy)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
index 4144c32..0c196ff 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java
@@ -259,6 +259,17 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         for (SSTableReader sstable : Iterables.concat(compacting, nonCompacting))
             total += sstable.getIndexSummaryOffHeapSize();
 
+        List<SSTableReader> oldFormatSSTables = new ArrayList<>();
+        for (SSTableReader sstable : nonCompacting)
+        {
+            // We can't change the sampling level of sstables with the old format, because
the serialization format
+            // doesn't include the sampling level.  Leave this one as it is.  (See CASSANDRA-8993
for details.)
+            logger.trace("SSTable {} cannot be re-sampled due to old sstable format", sstable);
+            if (!sstable.descriptor.version.hasSamplingLevel)
+                oldFormatSSTables.add(sstable);
+        }
+        nonCompacting.removeAll(oldFormatSSTables);
+
         logger.debug("Beginning redistribution of index summaries for {} sstables with memory
pool size {} MB; current spaced used is {} MB",
                      nonCompacting.size(), memoryPoolBytes / 1024L / 1024L, total / 1024.0
/ 1024.0);
 
@@ -280,7 +291,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         Collections.sort(sstablesByHotness, new ReadRateComparator(readRates));
 
         long remainingBytes = memoryPoolBytes;
-        for (SSTableReader sstable : compacting)
+        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables))
             remainingBytes -= sstable.getIndexSummaryOffHeapSize();
 
         logger.trace("Index summaries for compacting SSTables are using {} MB of space",
@@ -288,7 +299,7 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean
         List<SSTableReader> newSSTables = adjustSamplingLevels(sstablesByHotness, totalReadsPerSec,
remainingBytes);
 
         total = 0;
-        for (SSTableReader sstable : Iterables.concat(compacting, newSSTables))
+        for (SSTableReader sstable : Iterables.concat(compacting, oldFormatSSTables, newSSTables))
             total += sstable.getIndexSummaryOffHeapSize();
         logger.debug("Completed resizing of index summaries; current approximate memory used:
{} MB",
                      total / 1024.0 / 1024.0);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index f5eef09..8fd7b85 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -26,16 +26,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -708,13 +699,39 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
                                          : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
         boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
+        boolean builtSummary = false;
         if (recreateBloomFilter || !summaryLoaded)
+        {
             buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
+            builtSummary = true;
+        }
 
         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
         dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
-        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) // save
summary information to disk
+
+        // Check for an index summary that was downsampled even though the serialization
format doesn't support
+        // that.  If it was downsampled, rebuild it.  See CASSANDRA-8993 for details.
+        if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel())
+        {
+            indexSummary.close();
+            ifile.close();
+            dfile.close();
+
+            logger.info("Detected erroneously downsampled index summary; will rebuild summary
at full sampling");
+            FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY)));
+            ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
+            dbuilder = compression
+                       ? SegmentedFile.getCompressedBuilder()
+                       : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
+            buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
+            ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+            dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+            saveSummary(ibuilder, dbuilder);
+        }
+        else if (saveSummaryIfCreated && builtSummary)
+        {
             saveSummary(ibuilder, dbuilder);
+        }
     }
 
     /**
@@ -800,7 +817,9 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
         try
         {
             iStream = new DataInputStream(new FileInputStream(summariesFile));
-            indexSummary = IndexSummary.serializer.deserialize(iStream, partitioner, descriptor.version.hasSamplingLevel,
metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
+            indexSummary = IndexSummary.serializer.deserialize(
+                    iStream, partitioner, descriptor.version.hasSamplingLevel,
+                    metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
             first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
@@ -826,6 +845,57 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
     }
 
     /**
+     * Validates that an index summary has full sampling, as expected when the serialization
format does not support
+     * persisting the sampling level.
+     * @return true if the summary has full sampling, false otherwise
+     */
+    private boolean validateSummarySamplingLevel()
+    {
+        // We need to check index summary entries against the index to verify that none of
them were dropped due to
+        // downsampling.  Downsampling can drop any of the first BASE_SAMPLING_LEVEL entries
(repeating that drop pattern
+        // for the remainder of the summary).  Unfortunately, the first entry to be dropped
is the entry at
+        // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of BASE_SAMPLING_LEVEL
entries.
+        Iterator<FileDataInput> segments = ifile.iterator(0);
+        int i = 0;
+        int summaryEntriesChecked = 0;
+        int expectedIndexInterval = getMinIndexInterval();
+        while (segments.hasNext())
+        {
+            FileDataInput in = segments.next();
+            try
+            {
+                while (!in.isEOF())
+                {
+                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
+                    if (i % expectedIndexInterval == 0)
+                    {
+                        ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i / expectedIndexInterval));
+                        if (!summaryKey.equals(indexKey))
+                            return false;
+                        summaryEntriesChecked++;
+
+                        if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
+                            return true;
+                    }
+                    RowIndexEntry.Serializer.skip(in);
+                    i++;
+                }
+            }
+            catch (IOException e)
+            {
+                markSuspect();
+                throw new CorruptSSTableException(e, in.getPath());
+            }
+            finally
+            {
+                FileUtils.closeQuietly(in);
+            }
+        }
+
+        return true;
+    }
+
+    /**
      * Save index summary to Summary.db file.
      *
      * @param ibuilder
@@ -946,6 +1016,8 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
      */
     public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int samplingLevel)
throws IOException
     {
+        assert descriptor.version.hasSamplingLevel;
+
         synchronized (tidy.global)
         {
             assert openReason != OpenReason.EARLY;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7ff25f0d/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
index 0760aa3..9ed5b32 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryTest.java
@@ -257,7 +257,7 @@ public class IndexSummaryTest
         assertEquals(128, BASE_SAMPLING_LEVEL);
         assertEquals(Arrays.asList(0, 32, 64, 96), Downsampling.getOriginalIndexes(4));
         assertEquals(Arrays.asList(0, 64), Downsampling.getOriginalIndexes(2));
-        assertEquals(Arrays.asList(), Downsampling.getOriginalIndexes(0));
+        assertEquals(Arrays.asList(0), Downsampling.getOriginalIndexes(1));
     }
 
     @Test


Mime
View raw message