cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [05/12] cassandra git commit: Merge branch 'cassandra-2.0' into cassandra-2.1
Date Tue, 07 Jul 2015 15:46:10 GMT
Merge branch 'cassandra-2.0' into cassandra-2.1

Conflicts:
	CHANGES.txt
	src/java/org/apache/cassandra/db/compaction/Scrubber.java
	src/java/org/apache/cassandra/io/sstable/SSTableReader.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4c94ef20
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4c94ef20
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4c94ef20

Branch: refs/heads/cassandra-2.2
Commit: 4c94ef20d3562ab8f0a922945d78464d6c475d98
Parents: 4de943f 452d6a4
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Tue Jul 7 16:27:58 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Tue Jul 7 16:27:58 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 ++
 .../cassandra/db/compaction/Scrubber.java       | 37 +++++++++++---
 .../cassandra/io/sstable/SSTableReader.java     | 52 ++++++++++++++------
 .../cassandra/tools/StandaloneScrubber.java     |  2 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java | 25 ++++++++++
 5 files changed, 97 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 95dc8be,bd1db92..2cbc7c4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,18 -1,8 +1,23 @@@
 -2.0.18
 -* Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
 -
 -
 -2.0.17
++2.1.9
++Merged from 2.0:
++ * Scrub (recover) sstables even when -Index.db is missing, (CASSANDRA-9591)
++
++
 +2.1.8
 + * (cqlsh) Fix bad check for CQL compatibility when DESCRIBE'ing
 +   COMPACT STORAGE tables with no clustering columns
 + * Warn when an extra-large partition is compacted (CASSANDRA-9643)
 + * Eliminate strong self-reference chains in sstable ref tidiers (CASSANDRA-9656)
 + * Ensure StreamSession uses canonical sstable reader instances (CASSANDRA-9700) 
 + * Ensure memtable book keeping is not corrupted in the event we shrink usage (CASSANDRA-9681)
 + * Update internal python driver for cqlsh (CASSANDRA-9064)
 + * Fix IndexOutOfBoundsException when inserting tuple with too many
 +   elements using the string literal notation (CASSANDRA-9559)
 + * Allow JMX over SSL directly from nodetool (CASSANDRA-9090)
 + * Fix incorrect result for IN queries where column not found (CASSANDRA-9540)
 + * Enable describe on indices (CASSANDRA-7814)
 + * ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637)
 +Merged from 2.0:
   * Avoid NPE in AuthSuccess#decode (CASSANDRA-9727)
   * Add listen_address to system.local (CASSANDRA-9603)
   * Bug fixes to resultset metadata construction (CASSANDRA-9636)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index ce98a13,dc60efa..b1c12e0
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -100,8 -95,17 +100,18 @@@ public class Scrubber implements Closea
          this.controller = isOffline
                          ? new ScrubController(cfs)
                          : new CompactionController(cfs, Collections.singleton(sstable),
CompactionManager.getDefaultGcBefore(cfs));
 -        this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative();
 +        this.isCommutative = cfs.metadata.isCounter();
-         this.expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub)));
+ 
+         boolean hasIndexFile = (new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))).exists();
+         if (!hasIndexFile)
+         {
+             // if there's any corruption in the -Data.db then rows can't be skipped over.
but it's worth a shot.
+             outputHandler.warn("Missing component: " + sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
+         }
+ 
 -        this.expectedBloomFilterSize = Math.max(cfs.metadata.getIndexInterval(),
 -                hasIndexFile ? (int)(SSTableReader.getApproximateKeyCount(toScrub,cfs.metadata))
: 0);
++        this.expectedBloomFilterSize = Math.max(
++            cfs.metadata.getMinIndexInterval(),
++            hasIndexFile ? (int)(SSTableReader.getApproximateKeyCount(toScrub)) : 0);
  
          // loop through each row, deserializing to check for damage.
          // we'll also loop through the index at the same time, using the position from the
index to recover if the
@@@ -120,14 -128,13 +134,15 @@@
      public void scrub()
      {
          outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
 +        Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
 +        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge,
isOffline);
          try
          {
-             nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
+             nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile)
: null;
+             if (indexAvailable())
              {
                  // throw away variable so we don't have a side effect in the assert
 -                long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile,
sstable.descriptor.version).position;
 +                long firstRowPositionFromIndex = sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile,
sstable.descriptor.version).position;
                  assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
              }
  
@@@ -167,13 -179,25 +182,13 @@@
                      dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex;
                  }
  
 -                if (!sstable.descriptor.version.hasRowSizeAndColumnCount)
 -                {
 -                    dataSize = dataSizeFromIndex;
 -                    // avoid an NPE if key is null
 -                    String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.key);
 -                    outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
 -                }
 -                else
 -                {
 -                    if (currentIndexKey != null)
 -                        outputHandler.debug(String.format("Index doublecheck: row %s is
%s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
 -                }
 +                dataSize = dataSizeFromIndex;
 +                // avoid an NPE if key is null
 +                String keyName = key == null ? "(unreadable key)" : ByteBufferUtil.bytesToHex(key.getKey());
 +                outputHandler.debug(String.format("row %s is %s bytes", keyName, dataSize));
  
-                 assert currentIndexKey != null || indexFile.isEOF();
+                 assert currentIndexKey != null || !indexAvailable();
  
 -                writer.mark();
                  try
                  {
                      if (key == null)
@@@ -188,11 -212,11 +203,11 @@@
                      if (dataSize > dataFile.length())
                          throw new IOError(new IOException("Impossible row size (greater
than file length): " + dataSize));
  
-                     if (dataStart != dataStartFromIndex)
+                     if (indexFile != null && dataStart != dataStartFromIndex)
                          outputHandler.warn(String.format("Data file row position %d differs
from index file row position %d", dataStart, dataStartFromIndex));
  
-                     if (dataSize != dataSizeFromIndex)
+                     if (indexFile != null && dataSize != dataSizeFromIndex)
 -                        outputHandler.warn(String.format("Data file row size %d differs
from index file row size %d", dataSize, dataSizeFromIndex));
 +                        outputHandler.warn(String.format("Data file row size %d different
from index file row size %d", dataSize, dataSizeFromIndex));
  
                      SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable,
dataFile, key, dataSize, validateColumns);
                      if (prevKey != null && prevKey.compareTo(key) > 0)
@@@ -312,10 -331,11 +327,11 @@@
          currentRowPositionFromIndex = nextRowPositionFromIndex;
          try
          {
-             nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile);
-             nextRowPositionFromIndex = indexFile.isEOF()
+             nextIndexKey = !indexAvailable() ? null : ByteBufferUtil.readWithShortLength(indexFile);
+ 
+             nextRowPositionFromIndex = !indexAvailable()
                      ? dataFile.length()
 -                    : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position;
 +                    : sstable.metadata.comparator.rowIndexEntrySerializer().deserialize(indexFile,
sstable.descriptor.version).position;
          }
          catch (Throwable th)
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 7551d46,8919a09..6879834
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -448,27 -190,9 +448,27 @@@ public class SSTableReader extends SSTa
                                        IPartitioner partitioner,
                                        boolean validate) throws IOException
      {
 -        long start = System.nanoTime();
 -        SSTableMetadata sstableMetadata = openMetadata(descriptor, components, partitioner,
validate);
 +        // Minimum components without which we can't do anything
 +        assert components.contains(Component.DATA) : "Data component is missing for sstable"
+ descriptor;
-         assert components.contains(Component.PRIMARY_INDEX) : "Primary index component is
missing for sstable " + descriptor;
++        assert !validate || components.contains(Component.PRIMARY_INDEX) : "Primary index
component is missing for sstable " + descriptor;
 +
 +        Map<MetadataType, MetadataComponent> sstableMetadata = descriptor.getMetadataSerializer().deserialize(descriptor,
-                                                                                        
                       EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
++                                                                                       
                      EnumSet.of(MetadataType.VALIDATION, MetadataType.STATS));
 +        ValidationMetadata validationMetadata = (ValidationMetadata) sstableMetadata.get(MetadataType.VALIDATION);
 +        StatsMetadata statsMetadata = (StatsMetadata) sstableMetadata.get(MetadataType.STATS);
 +
 +        // Check if sstable is created using same partitioner.
 +        // Partitioner can be null, which indicates older version of sstable or no stats
available.
 +        // In that case, we skip the check.
 +        String partitionerName = partitioner.getClass().getCanonicalName();
 +        if (validationMetadata != null && !partitionerName.equals(validationMetadata.partitioner))
 +        {
 +            logger.error(String.format("Cannot open %s; partitioner %s does not match system
partitioner %s.  Note that the default partitioner starting with Cassandra 1.2 is Murmur3Partitioner,
so you will need to edit that to match your old partitioner if upgrading.",
 +                                              descriptor, validationMetadata.partitioner,
partitionerName));
 +            System.exit(1);
 +        }
  
 +        logger.info("Opening {} ({} bytes)", descriptor, new File(descriptor.filenameFor(Component.DATA)).length());
          SSTableReader sstable = new SSTableReader(descriptor,
                                                    components,
                                                    metadata,
@@@ -603,48 -374,35 +603,43 @@@
          this.dfile = dfile;
          this.indexSummary = indexSummary;
          this.bf = bloomFilter;
 +        this.setup(false);
      }
  
 -    /**
 -     * Clean up all opened resources.
 -     *
 -     * @throws IOException
 -     */
 -    public void close() throws IOException
 +    public static long getTotalBytes(Iterable<SSTableReader> sstables)
      {
 -        if (readMeterSyncFuture != null)
 -            readMeterSyncFuture.cancel(false);
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +            sum += sstable.onDiskLength();
 +        return sum;
 +    }
  
 -        // Force finalizing mmapping if necessary
 +    public static long getTotalUncompressedBytes(Iterable<SSTableReader> sstables)
 +    {
 +        long sum = 0;
 +        for (SSTableReader sstable : sstables)
 +            sum += sstable.uncompressedLength();
  
 -        if (null != ifile)
 -            ifile.cleanup();
 +        return sum;
 +    }
 +
 +    public boolean equals(Object that)
 +    {
 +        return that instanceof SSTableReader && ((SSTableReader) that).descriptor.equals(this.descriptor);
 +    }
  
 -        dfile.cleanup();
 -        // close the BF so it can be opened later.
 -        if (null != bf)
 -            bf.close();
 +    public int hashCode()
 +    {
 +        return this.descriptor.hashCode();
 +    }
  
 -        if (null != indexSummary)
 -            indexSummary.close();
 +    public String getFilename()
 +    {
 +        return dfile.path;
      }
  
-     public String getIndexFilename()
-     {
-         return ifile.path;
-     }
- 
 -    public void setTrackedBy(DataTracker tracker)
 +    public void setupKeyCache()
      {
 -        deletingTask.setTracker(tracker);
          // under normal operation we can do this at any time, but SSTR is also used outside
C* proper,
          // e.g. by BulkLoader, which does not initialize the cache.  As a kludge, we set
up the cache
          // here when we know we're being wired into the rest of the server infrastructure.
@@@ -659,7 -417,13 +654,13 @@@
              load(false, true);
              bf = FilterFactory.AlwaysPresent;
          }
+         else if (!components.contains(Component.PRIMARY_INDEX))
+         {
+             // avoid any reading of the missing primary index component.
 -            // this should only happen during StandaloneScrubber
++            // this should only happen for standalone tools
+             load(false, false);
+         }
 -        else if (!components.contains(Component.FILTER))
 +        else if (!components.contains(Component.FILTER) || validation == null)
          {
              // bf is enabled, but filter component is missing.
              load(true, true);
@@@ -708,418 -467,26 +709,427 @@@
                                           ? SegmentedFile.getCompressedBuilder()
                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
  
 -        boolean summaryLoaded = loadSummary(this, ibuilder, dbuilder, metadata);
 +        boolean summaryLoaded = loadSummary(ibuilder, dbuilder);
 +        boolean builtSummary = false;
          if (recreateBloomFilter || !summaryLoaded)
 -            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded);
 +        {
 +            buildSummary(recreateBloomFilter, ibuilder, dbuilder, summaryLoaded, Downsampling.BASE_SAMPLING_LEVEL);
 +            builtSummary = true;
 +        }
  
-         ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+         if (components.contains(Component.PRIMARY_INDEX))
+             ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
+ 
          dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 -        if (saveSummaryIfCreated && (recreateBloomFilter || !summaryLoaded)) //
save summary information to disk
 -            saveSummary(this, ibuilder, dbuilder);
 +
 +        // Check for an index summary that was downsampled even though the serialization
format doesn't support
 +        // that.  If it was downsampled, rebuild it.  See CASSANDRA-8993 for details.
-         if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel())
++        if (!descriptor.version.hasSamplingLevel && !builtSummary && !validateSummarySamplingLevel()
&& ifile != null)
 +        {
 +            indexSummary.close();
 +            ifile.close();
 +            dfile.close();
 +
 +            logger.info("Detected erroneously downsampled index summary; will rebuild summary
at full sampling");
 +            FileUtils.deleteWithConfirm(new File(descriptor.filenameFor(Component.SUMMARY)));
 +            ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +            dbuilder = compression
 +                       ? SegmentedFile.getCompressedBuilder()
 +                       : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +            buildSummary(false, ibuilder, dbuilder, false, Downsampling.BASE_SAMPLING_LEVEL);
 +            ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
 +            dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +            saveSummary(ibuilder, dbuilder);
 +        }
 +        else if (saveSummaryIfCreated && builtSummary)
 +        {
 +            saveSummary(ibuilder, dbuilder);
 +        }
      }
  
 -     private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder,
SegmentedFile.Builder dbuilder, boolean summaryLoaded) throws IOException
 -     {
 +    /**
 +     * Build index summary(and optionally bloom filter) by reading through Index.db file.
 +     *
 +     * @param recreateBloomFilter true if recreate bloom filter
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @param summaryLoaded true if index summary is already loaded and not need to build
again
 +     * @throws IOException
 +     */
 +    private void buildSummary(boolean recreateBloomFilter, SegmentedFile.Builder ibuilder,
SegmentedFile.Builder dbuilder, boolean summaryLoaded, int samplingLevel) throws IOException
 +    {
+          if (!components.contains(Component.PRIMARY_INDEX))
+              return;
+ 
          // we read the positions in a BRAF so we don't have to worry about an entry spanning
a mmap boundary.
          RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
 -
 +
 +        try
 +        {
 +            long indexSize = primaryIndex.length();
 +            long histogramCount = sstableMetadata.estimatedRowSize.count();
 +            long estimatedKeys = histogramCount > 0 && !sstableMetadata.estimatedRowSize.isOverflowed()
 +                                 ? histogramCount
 +                                 : estimateRowsFromIndex(primaryIndex); // statistics is
supposed to be optional
 +
 +            try(IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys,
metadata.getMinIndexInterval(), samplingLevel))
 +            {
 +
 +                if (recreateBloomFilter)
 +                    bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(),
true);
 +
 +                long indexPosition;
 +                while ((indexPosition = primaryIndex.getFilePointer()) != indexSize)
 +                {
 +                    ByteBuffer key = ByteBufferUtil.readWithShortLength(primaryIndex);
 +                    RowIndexEntry indexEntry = metadata.comparator.rowIndexEntrySerializer().deserialize(primaryIndex,
descriptor.version);
 +                    DecoratedKey decoratedKey = partitioner.decorateKey(key);
 +                    if (first == null)
 +                        first = decoratedKey;
 +                    last = decoratedKey;
 +
 +                    if (recreateBloomFilter)
 +                        bf.add(decoratedKey.getKey());
 +
 +                    // if summary was already read from disk we don't want to re-populate
it using primary index
 +                    if (!summaryLoaded)
 +                    {
 +                        summaryBuilder.maybeAddEntry(decoratedKey, indexPosition);
 +                        ibuilder.addPotentialBoundary(indexPosition);
 +                        dbuilder.addPotentialBoundary(indexEntry.position);
 +                    }
 +                }
 +
 +                if (!summaryLoaded)
 +                    indexSummary = summaryBuilder.build(partitioner);
 +            }
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(primaryIndex);
 +        }
 +
 +        first = getMinimalKey(first);
 +        last = getMinimalKey(last);
 +    }
 +
 +    /**
 +     * Load index summary from Summary.db file if it exists.
 +     *
 +     * if loaded index summary has different index interval from current value stored in
schema,
 +     * then Summary.db file will be deleted and this returns false to rebuild summary.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     * @return true if index summary is loaded successfully from Summary.db file.
 +     */
 +    public boolean loadSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            indexSummary = IndexSummary.serializer.deserialize(
 +                    iStream, partitioner, descriptor.version.hasSamplingLevel,
 +                    metadata.getMinIndexInterval(), metadata.getMaxIndexInterval());
 +            first = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            if (indexSummary != null)
 +                indexSummary.close();
 +            logger.debug("Cannot deserialize SSTable Summary File {}: {}", summariesFile.getPath(),
e.getMessage());
 +            // corrupted; delete it and fall back to creating a new summary
 +            FileUtils.closeQuietly(iStream);
 +            // delete it and fall back to creating a new summary
 +            FileUtils.deleteWithConfirm(summariesFile);
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Validates that an index summary has full sampling, as expected when the serialization
format does not support
 +     * persisting the sampling level.
 +     * @return true if the summary has full sampling, false otherwise
 +     */
 +    private boolean validateSummarySamplingLevel()
 +    {
 +        // We need to check index summary entries against the index to verify that none
of them were dropped due to
 +        // downsampling.  Downsampling can drop any of the first BASE_SAMPLING_LEVEL entries
(repeating that drop pattern
 +        // for the remainder of the summary).  Unfortunately, the first entry to be dropped
is the entry at
 +        // index (BASE_SAMPLING_LEVEL - 1), so we need to check a full set of BASE_SAMPLING_LEVEL
entries.
++        if (ifile == null)
++            return false;
++
 +        Iterator<FileDataInput> segments = ifile.iterator(0);
 +        int i = 0;
 +        int summaryEntriesChecked = 0;
 +        int expectedIndexInterval = getMinIndexInterval();
 +        while (segments.hasNext())
 +        {
 +            FileDataInput in = segments.next();
 +            try
 +            {
 +                while (!in.isEOF())
 +                {
 +                    ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
 +                    if (i % expectedIndexInterval == 0)
 +                    {
 +                        ByteBuffer summaryKey = ByteBuffer.wrap(indexSummary.getKey(i /
expectedIndexInterval));
 +                        if (!summaryKey.equals(indexKey))
 +                            return false;
 +                        summaryEntriesChecked++;
 +
 +                        if (summaryEntriesChecked == Downsampling.BASE_SAMPLING_LEVEL)
 +                            return true;
 +                    }
 +                    RowIndexEntry.Serializer.skip(in);
 +                    i++;
 +                }
 +            }
 +            catch (IOException e)
 +            {
 +                markSuspect();
 +                throw new CorruptSSTableException(e, in.getPath());
 +            }
 +            finally
 +            {
 +                FileUtils.closeQuietly(in);
 +            }
 +        }
 +
 +        return true;
 +    }
 +
 +    /**
 +     * Save index summary to Summary.db file.
 +     *
 +     * @param ibuilder
 +     * @param dbuilder
 +     */
 +    public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        saveSummary(ibuilder, dbuilder, indexSummary);
 +    }
 +
 +    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder,
IndexSummary summary)
 +    {
 +        File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            FileUtils.deleteWithConfirm(summariesFile);
 +
 +        DataOutputStreamAndChannel oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStreamAndChannel(new FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(summary, oStream, descriptor.version.hasSamplingLevel);
 +            ByteBufferUtil.writeWithLength(first.getKey(), oStream);
 +            ByteBufferUtil.writeWithLength(last.getKey(), oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                FileUtils.deleteWithConfirm(summariesFile);
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
 +    }
 +
 +    public void setReplacedBy(SSTableReader replacement)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert replacement != null;
 +            assert !tidy.isReplaced;
 +            tidy.isReplaced = true;
 +        }
 +    }
 +
 +    public boolean isReplaced()
 +    {
 +        return tidy.isReplaced;
 +    }
 +
 +    /**
 +     * Clone this reader with the provided start and open reason, and set the clone as replacement.
 +     *
 +     * @param newFirst the first key for the replacement (which can be different from the
original due to the pre-emptive
 +     * opening of compaction results).
 +     * @param reason the {@code OpenReason} for the replacement.
 +     *
 +     * @return the cloned reader. That reader is set as a replacement by the method.
 +     */
 +    private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason)
 +    {
 +        return cloneAndReplace(newFirst, reason, indexSummary.sharedCopy());
 +    }
 +
 +    /**
 +     * Clone this reader with the new values and set the clone as replacement.
 +     *
 +     * @param newFirst the first key for the replacement (which can be different from the
original due to the pre-emptive
 +     * opening of compaction results).
 +     * @param reason the {@code OpenReason} for the replacement.
 +     * @param newSummary the index summary for the replacement.
 +     *
 +     * @return the cloned reader. That reader is set as a replacement by the method.
 +     */
 +    private SSTableReader cloneAndReplace(DecoratedKey newFirst, OpenReason reason, IndexSummary
newSummary)
 +    {
 +        SSTableReader replacement = internalOpen(descriptor,
 +                                                 components,
 +                                                 metadata,
 +                                                 partitioner,
-                                                  ifile.sharedCopy(),
++                                                 ifile != null ? ifile.sharedCopy() : null,
 +                                                 dfile.sharedCopy(),
 +                                                 newSummary,
 +                                                 bf.sharedCopy(),
 +                                                 maxDataAge,
 +                                                 sstableMetadata,
 +                                                 reason);
 +        replacement.first = newFirst;
 +        replacement.last = last;
 +        replacement.isSuspect.set(isSuspect.get());
 +        setReplacedBy(replacement);
 +        return replacement;
 +    }
 +
 +    // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain
a reference chain to this reader
 +    public SSTableReader cloneWithNewStart(DecoratedKey newStart, final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +            // TODO: merge with caller's firstKeyBeyond() work,to save time
 +            if (newStart.compareTo(first) > 0)
 +            {
 +                final long dataStart = getPosition(newStart, Operator.EQ).position;
 +                final long indexStart = getIndexScanPosition(newStart);
 +                this.tidy.runOnClose = new DropPageCache(dfile, dataStart, ifile, indexStart,
runOnClose);
 +            }
 +
 +            return cloneAndReplace(newStart, OpenReason.MOVED_START);
 +        }
 +    }
 +
 +    // runOnClose must NOT be an anonymous or non-static inner class, nor must it retain
a reference chain to this reader
 +    public SSTableReader cloneAsShadowed(final Runnable runOnClose)
 +    {
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +            this.tidy.runOnClose = new DropPageCache(dfile, 0, ifile, 0, runOnClose);
 +            return cloneAndReplace(first, OpenReason.SHADOWED);
 +        }
 +    }
 +
 +    private static class DropPageCache implements Runnable
 +    {
 +        final SegmentedFile dfile;
 +        final long dfilePosition;
 +        final SegmentedFile ifile;
 +        final long ifilePosition;
 +        final Runnable andThen;
 +
 +        private DropPageCache(SegmentedFile dfile, long dfilePosition, SegmentedFile ifile,
long ifilePosition, Runnable andThen)
 +        {
 +            this.dfile = dfile;
 +            this.dfilePosition = dfilePosition;
 +            this.ifile = ifile;
 +            this.ifilePosition = ifilePosition;
 +            this.andThen = andThen;
 +        }
 +
 +        public void run()
 +        {
 +            dfile.dropPageCache(dfilePosition);
-             ifile.dropPageCache(ifilePosition);
++            if (ifile != null)
++                ifile.dropPageCache(ifilePosition);
 +            andThen.run();
 +        }
 +    }
 +
 +    /**
 +     * Returns a new SSTableReader with the same properties as this SSTableReader except
that a new IndexSummary will
 +     * be built at the target samplingLevel.  This (original) SSTableReader instance will
be marked as replaced, have
 +     * its DeletingTask removed, and have its periodic read-meter sync task cancelled.
 +     * @param samplingLevel the desired sampling level for the index summary on the new
SSTableReader
 +     * @return a new SSTableReader
 +     * @throws IOException
 +     */
 +    public SSTableReader cloneWithNewSummarySamplingLevel(ColumnFamilyStore parent, int
samplingLevel) throws IOException
 +    {
 +        assert descriptor.version.hasSamplingLevel;
 +
 +        synchronized (tidy.global)
 +        {
 +            assert openReason != OpenReason.EARLY;
 +
 +            int minIndexInterval = metadata.getMinIndexInterval();
 +            int maxIndexInterval = metadata.getMaxIndexInterval();
 +            double effectiveInterval = indexSummary.getEffectiveIndexInterval();
 +
 +            IndexSummary newSummary;
 +            long oldSize = bytesOnDisk();
 +
 +            // We have to rebuild the summary from the on-disk primary index in three cases:
 +            // 1. The sampling level went up, so we need to read more entries off disk
 +            // 2. The min_index_interval changed (in either direction); this changes what
entries would be in the summary
 +            //    at full sampling (and consequently at any other sampling level)
 +            // 3. The max_index_interval was lowered, forcing us to raise the sampling level
 +            if (samplingLevel > indexSummary.getSamplingLevel() || indexSummary.getMinIndexInterval()
!= minIndexInterval || effectiveInterval > maxIndexInterval)
 +            {
 +                newSummary = buildSummaryAtLevel(samplingLevel);
 +            }
 +            else if (samplingLevel < indexSummary.getSamplingLevel())
 +            {
 +                // we can use the existing index summary to make a smaller one
 +                newSummary = IndexSummaryBuilder.downsample(indexSummary, samplingLevel,
minIndexInterval, partitioner);
 +
 +                SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode());
 +                SegmentedFile.Builder dbuilder = compression
 +                                                 ? SegmentedFile.getCompressedBuilder()
 +                                                 : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 +                saveSummary(ibuilder, dbuilder, newSummary);
 +            }
 +            else
 +            {
 +                throw new AssertionError("Attempted to clone SSTableReader with the same
index summary sampling level and " +
 +                                         "no adjustments to min/max_index_interval");
 +            }
 +
 +            long newSize = bytesOnDisk();
 +            StorageMetrics.load.inc(newSize - oldSize);
 +            parent.metric.liveDiskSpaceUsed.inc(newSize - oldSize);
 +
 +            return cloneAndReplace(first, OpenReason.METADATA_CHANGE, newSummary);
 +        }
 +    }
 +
 +    private IndexSummary buildSummaryAtLevel(int newSamplingLevel) throws IOException
 +    {
 +        // we read the positions in a BRAF so we don't have to worry about an entry spanning
a mmap boundary.
 +        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
          try
          {
              long indexSize = primaryIndex.length();
@@@ -1566,16 -974,22 +1576,19 @@@
          {
              if (op == Operator.EQ && updateCacheAndStats)
                  bloomFilterTracker.addFalsePositive();
 -            // we matched the -1th position: if the operator might match forward, we'll
start at the first
 -            // position. We however need to return the correct index entry for that first
position.
 -            if (op.apply(1) >= 0)
 -            {
 -                sampledPosition = 0;
 -            }
 -            else
 -            {
 -                Tracing.trace("Partition summary allows skipping sstable {}", descriptor.generation);
 -                return null;
 -            }
 +            Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
 +            return null;
          }
  
 +        int binarySearchResult = indexSummary.binarySearch(key);
 +        long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult,
indexSummary);
 +        int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
 +
 +        int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
 +
+         if (ifile == null)
+             return null;
+ 
          // scan the on-disk index, starting at the nearest sampled position.
          // The check against IndexInterval is to be exit the loop in the EQ case when the
key looked for is not present
          // (bloom filter false positive). But note that for non-EQ cases, we might need
to check the first key of the
@@@ -1670,11 -1084,13 +1683,14 @@@
       */
      public DecoratedKey firstKeyBeyond(RowPosition token)
      {
 +        if (token.compareTo(first) < 0)
 +            return first;
 +
          long sampledPosition = getIndexScanPosition(token);
 -        if (sampledPosition == -1)
 -            sampledPosition = 0;
  
+         if (ifile == null)
+             return null;
+ 
          Iterator<FileDataInput> segments = ifile.iterator(sampledPosition);
          while (segments.hasNext())
          {
@@@ -1948,8 -1391,7 +1964,10 @@@
      {
          try
          {
 -            return SSTableMetadata.serializer.deserialize(descriptor).right;
 +            CompactionMetadata compactionMetadata = (CompactionMetadata) descriptor.getMetadataSerializer().deserialize(descriptor,
MetadataType.COMPACTION);
-             return compactionMetadata.ancestors;
++            if (compactionMetadata != null)
++                return compactionMetadata.ancestors;
++            return Collections.emptySet();
          }
          catch (IOException e)
          {
@@@ -1995,7 -1441,7 +2013,9 @@@
  
      public RandomAccessReader openIndexReader()
      {
-         return ifile.createReader();
 -        return RandomAccessReader.open(new File(getIndexFilename()));
++        if (ifile != null)
++            return ifile.createReader();
++        return null;
      }
  
      /**
@@@ -2024,152 -1470,73 +2044,154 @@@
      }
  
      /**
 -     * @param sstables
 -     * @return true if all desired references were acquired.  Otherwise, it will unreference
any partial acquisition, and return false.
 +     * Increment the total row read count and read rate for this SSTable.  This should not
be incremented for range
 +     * slice queries, row cache hits, or non-query reads, like compaction.
       */
 -    public static boolean acquireReferences(Iterable<SSTableReader> sstables)
 +    public void incrementReadCount()
      {
 -        SSTableReader failed = null;
 -        for (SSTableReader sstable : sstables)
 +        if (readMeter != null)
 +            readMeter.mark();
 +    }
 +
 +    public static class SizeComparator implements Comparator<SSTableReader>
 +    {
 +        public int compare(SSTableReader o1, SSTableReader o2)
          {
 -            if (!sstable.acquireReference())
 -            {
 -                failed = sstable;
 -                break;
 -            }
 +            return Longs.compare(o1.onDiskLength(), o2.onDiskLength());
          }
 +    }
  
 -        if (failed == null)
 -            return true;
 +    public Ref<SSTableReader> tryRef()
 +    {
 +        return selfRef.tryRef();
 +    }
  
 -        for (SSTableReader sstable : sstables)
 -        {
 -            if (sstable == failed)
 -                break;
 -            sstable.releaseReference();
 -        }
 -        return false;
 +    public Ref<SSTableReader> selfRef()
 +    {
 +        return selfRef;
      }
  
 -    public static void releaseReferences(Iterable<SSTableReader> sstables)
 +    public Ref<SSTableReader> ref()
      {
 -        for (SSTableReader sstable : sstables)
 -        {
 -            sstable.releaseReference();
 -        }
 +        return selfRef.ref();
      }
  
 -    private void dropPageCache()
 +    void setup(boolean isOffline)
      {
 -        dropPageCache(dfile.path);
 -        if (null != ifile)
 -            dropPageCache(ifile.path);
 +        tidy.setup(this, isOffline);
 +        this.readMeter = tidy.global.readMeter;
      }
  
 -    private void dropPageCache(String filePath)
 +    @VisibleForTesting
 +    public void overrideReadMeter(RestorableMeter readMeter)
      {
 -        RandomAccessFile file = null;
 +        this.readMeter = tidy.global.readMeter = readMeter;
 +    }
  
 -        try
 +    /**
 +     * One instance per SSTableReader we create. This references the type-shared tidy, which
in turn references
 +     * the globally shared tidy, i.e.
 +     *
 +     * InstanceTidier => DescriptorTypeTitdy => GlobalTidy
 +     *
 +     * We can create many InstanceTidiers (one for every time we reopen an sstable with
MOVED_START for example), but there can only be
 +     * two DescriptorTypeTidy (FINAL and TEMPLINK) and only one GlobalTidy for one single
logical sstable.
 +     *
 +     * When the InstanceTidier cleansup, it releases its reference to its DescriptorTypeTidy;
when all InstanceTidiers
 +     * for that type have run, the DescriptorTypeTidy cleansup. DescriptorTypeTidy behaves
in the same way towards GlobalTidy.
 +     *
 +     * For ease, we stash a direct reference to both our type-shared and global tidier
 +     */
 +    private static final class InstanceTidier implements Tidy
 +    {
 +        private final Descriptor descriptor;
 +        private final CFMetaData metadata;
 +        private IFilter bf;
 +        private IndexSummary summary;
 +
 +        private SegmentedFile dfile;
 +        private SegmentedFile ifile;
 +        private Runnable runOnClose;
 +        private boolean isReplaced = false;
 +
 +        // a reference to our shared per-Descriptor.Type tidy instance, that
 +        // we will release when we are ourselves released
 +        private Ref<DescriptorTypeTidy> typeRef;
 +
 +        // a convenience stashing of the shared per-descriptor-type tidy instance itself
 +        // and the per-logical-sstable globally shared state that it is linked to
 +        private DescriptorTypeTidy type;
 +        private GlobalTidy global;
 +
 +        private boolean setup;
 +
 +        void setup(SSTableReader reader, boolean isOffline)
 +        {
 +            this.setup = true;
 +            this.bf = reader.bf;
 +            this.summary = reader.indexSummary;
 +            this.dfile = reader.dfile;
 +            this.ifile = reader.ifile;
 +            // get a new reference to the shared descriptor-type tidy
 +            this.typeRef = DescriptorTypeTidy.get(reader);
 +            this.type = typeRef.get();
 +            this.global = type.globalRef.get();
 +            if (!isOffline)
 +                global.ensureReadMeter();
 +        }
 +
 +        InstanceTidier(Descriptor descriptor, CFMetaData metadata)
          {
 -            file = new RandomAccessFile(filePath, "r");
 +            this.descriptor = descriptor;
 +            this.metadata = metadata;
 +        }
  
 -            int fd = CLibrary.getfd(file.getFD());
 +        public void tidy()
 +        {
 +            // don't try to cleanup if the sstablereader was never fully constructed
 +            if (!setup)
 +                return;
  
 -            if (fd > 0)
 +            final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
 +            final OpOrder.Barrier barrier;
 +            if (cfs != null)
              {
 -                if (logger.isDebugEnabled())
 -                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
 -
 -                CLibrary.trySkipCache(fd, 0, 0);
 +                barrier = cfs.readOrdering.newBarrier();
 +                barrier.issue();
              }
 +            else
 +                barrier = null;
 +
 +            ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
 +            {
 +                public void run()
 +                {
 +                    if (barrier != null)
 +                        barrier.await();
-                     bf.close();
++                    if (bf != null)
++                        bf.close();
 +                    if (summary != null)
 +                        summary.close();
 +                    if (runOnClose != null)
 +                        runOnClose.run();
 +                    dfile.close();
-                     ifile.close();
++                    if (ifile != null)
++                        ifile.close();
 +                    typeRef.release();
 +                }
 +            });
          }
 -        catch (IOException e)
 +
 +        public String name()
          {
 -            // we don't care if cache cleanup fails
 +            return descriptor.toString();
          }
 -        finally
 +
 +        void releaseSummary()
          {
 -            FileUtils.closeQuietly(file);
 +            summary.close();
 +            assert summary.isCleanedUp();
 +            summary = null;
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4c94ef20/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------


Mime
View raw message