cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [02/16] cassandra git commit: Fix Mmapped File Boundaries
Date Wed, 30 Sep 2015 18:48:20 GMT
Fix Mmapped File Boundaries

This patch fixes two bugs with mmap segment boundary
tracking, and introduces automated correction of
this bug on startup

patch by benedict; reviewed by tjake for CASSANDRA-10357


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c37562e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c37562e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c37562e3

Branch: refs/heads/cassandra-2.2
Commit: c37562e345c24720c55428a8644191df68319812
Parents: f6cab37
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Wed Sep 16 18:09:32 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Wed Sep 30 19:45:49 2015 +0100

----------------------------------------------------------------------
 .../io/sstable/AbstractSSTableSimpleWriter.java |   2 +
 .../cassandra/io/sstable/CQLSSTableWriter.java  |  15 +
 .../cassandra/io/sstable/SSTableReader.java     |  34 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   6 +
 .../io/sstable/SSTableSimpleWriter.java         |  12 +
 .../cassandra/io/util/MappedFileDataInput.java  |   8 +-
 .../cassandra/io/util/MmappedSegmentedFile.java | 270 +++++++++++++---
 .../apache/cassandra/io/util/SegmentedFile.java |   1 +
 .../sstable/LongSegmentedFileBoundaryTest.java  | 322 +++++++++++++++++++
 9 files changed, 601 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index 165a4b2..557c3de 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -200,4 +200,6 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable
     protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws
IOException;
 
     protected abstract ColumnFamily getColumnFamily() throws IOException;
+
+    public abstract Descriptor getCurrentDescriptor();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index b211a90..c364171 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -271,6 +271,16 @@ public class CQLSSTableWriter implements Closeable
         writer.close();
     }
 
+    public Descriptor getCurrentDescriptor()
+    {
+        return writer.getCurrentDescriptor();
+    }
+
+    public CFMetaData getCFMetaData()
+    {
+        return writer.metadata;
+    }
+
     /**
      * A Builder for a CQLSSTableWriter object.
      */
@@ -366,6 +376,11 @@ public class CQLSSTableWriter implements Closeable
             }
         }
 
+        CFMetaData metadata()
+        {
+            return schema;
+        }
+
         /**
          * Adds the specified column family to the specified keyspace.
          *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 0f307b0..84add6f 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -17,13 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
+import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.*;
@@ -70,20 +64,14 @@ import org.apache.cassandra.dht.LocalPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.FSError;
+import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
 import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.sstable.metadata.ValidationMetadata;
-import org.apache.cassandra.io.util.BufferedSegmentedFile;
-import org.apache.cassandra.io.util.CompressedSegmentedFile;
-import org.apache.cassandra.io.util.DataOutputStreamAndChannel;
-import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.io.util.ICompressedFile;
-import org.apache.cassandra.io.util.RandomAccessReader;
-import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.metrics.RestorableMeter;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -162,6 +150,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR
 public class SSTableReader extends SSTable implements SelfRefCounted<SSTableReader>
 {
     private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class);
+    private static final int ACCURATE_BOUNDARIES_MAGIC_NUMBER = 248923458;
 
     private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1);
     static
@@ -892,6 +881,19 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
             last = partitioner.decorateKey(ByteBufferUtil.readWithLength(iStream));
             ibuilder.deserializeBounds(iStream);
             dbuilder.deserializeBounds(iStream);
+
+            boolean checkForRepair = true;
+            try
+            {
+                int v = iStream.readInt();
+                // check for our magic number, indicating this summary has been sampled correctly
+                checkForRepair = v != ACCURATE_BOUNDARIES_MAGIC_NUMBER;
+            }
+            catch (Throwable t) {}
+
+            // fix CASSANDRA-10357 on-the-fly
+            if (checkForRepair && MmappedSegmentedFile.maybeRepair(metadata, descriptor,
indexSummary, ibuilder, dbuilder))
+                saveSummary(ibuilder, dbuilder);
         }
         catch (IOException e)
         {
@@ -992,6 +994,8 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
             ByteBufferUtil.writeWithLength(last.getKey(), oStream);
             ibuilder.serializeBounds(oStream);
             dbuilder.serializeBounds(oStream);
+            // write a magic number, to indicate this summary has been sampled correctly
+            oStream.writeInt(ACCURATE_BOUNDARIES_MAGIC_NUMBER);
         }
         catch (IOException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 9ee9ea1..25ec354 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -140,6 +140,12 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         return previous;
     }
 
+    public Descriptor getCurrentDescriptor()
+    {
+        // can be implemented, but isn't necessary
+        throw new UnsupportedOperationException();
+    }
+
     protected ColumnFamily createColumnFamily() throws IOException
     {
         return ArrayBackedSortedColumns.factory.create(metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 87c8e33..23da501 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -65,6 +65,13 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
         writer = getWriter();
     }
 
+    SSTableReader closeAndOpenReader()
+    {
+        if (currentKey != null)
+            writeRow(currentKey, columnFamily);
+        return writer.closeAndOpenReader();
+    }
+
     public void close()
     {
         try
@@ -89,4 +96,9 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
     {
         return ArrayBackedSortedColumns.factory.create(metadata);
     }
+
+    public Descriptor getCurrentDescriptor()
+    {
+        return writer.descriptor;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
index d056240..f93ce72 100644
--- a/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
@@ -51,12 +51,18 @@ public class MappedFileDataInput extends AbstractDataInput implements
FileDataIn
     public void seek(long pos) throws IOException
     {
         long inSegmentPos = pos - segmentOffset;
-        if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
+        if (!contains(pos))
             throw new IOException(String.format("Seek position %d is not within mmap segment
(seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
 
         seekInternal((int) inSegmentPos);
     }
 
+    public boolean contains(long pos)
+    {
+        long inSegmentPos = pos - segmentOffset;
+        return inSegmentPos >= 0 && inSegmentPos < buffer.capacity();
+    }
+
     public long getFilePointer()
     {
         return segmentOffset + (long)position;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 1b23343..623f65a 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -24,11 +24,17 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.io.FSReadError;
-import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.IndexSummary;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
 public class MmappedSegmentedFile extends SegmentedFile
@@ -135,52 +141,220 @@ public class MmappedSegmentedFile extends SegmentedFile
         }
     }
 
+    // see CASSANDRA-10357
+    public static boolean maybeRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary
indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+    {
+        boolean mayNeedRepair = false;
+        if (ibuilder instanceof Builder)
+            mayNeedRepair = ((Builder) ibuilder).mayNeedRepair(descriptor.filenameFor(Component.PRIMARY_INDEX));
+        if (dbuilder instanceof Builder)
+            mayNeedRepair |= ((Builder) dbuilder).mayNeedRepair(descriptor.filenameFor(Component.DATA));
+
+        if (mayNeedRepair)
+            forceRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder);
+        return mayNeedRepair;
+    }
+
+    // if one of the index/data files have boundaries larger than we can mmap, and they were
written by a version that did not guarantee correct boundaries were saved,
+    // rebuild the boundaries and save them again
+    private static void forceRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary
indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+    {
+        if (ibuilder instanceof Builder)
+            ((Builder) ibuilder).boundaries.clear();
+        if (dbuilder instanceof Builder)
+            ((Builder) dbuilder).boundaries.clear();
+
+        try (RandomAccessFile raf = new RandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX),
"r");)
+        {
+            long iprev = 0, dprev = 0;
+            for (int i = 0; i < indexSummary.size(); i++)
+            {
+                // first read the position in the summary, and read the corresponding position
in the data file
+                long icur = indexSummary.getPosition(i);
+                raf.seek(icur);
+                ByteBufferUtil.readWithShortLength(raf);
+                RowIndexEntry rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf,
descriptor.version);
+                long dcur = rie.position;
+
+                // if these positions are small enough to map out a segment from the prior
version (i.e. less than 2Gb),
+                // just add these as a boundary and proceed to the next index summary record;
most scenarios will be
+                // served by this, keeping the cost of rebuild to a minimum.
+
+                if (Math.max(icur - iprev , dcur - dprev) > MAX_SEGMENT_SIZE)
+                {
+                    // otherwise, loop over its index block, providing each RIE as a potential
boundary for both files
+                    raf.seek(iprev);
+                    while (raf.getFilePointer() < icur)
+                    {
+                        // add the position of this record in the index file as an index
file boundary
+                        ibuilder.addPotentialBoundary(raf.getFilePointer());
+                        // then read the RIE, and add its data file position as a boundary
for the data file
+                        ByteBufferUtil.readWithShortLength(raf);
+                        rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf,
descriptor.version);
+                        dbuilder.addPotentialBoundary(rie.position);
+                    }
+                }
+
+                ibuilder.addPotentialBoundary(icur);
+                dbuilder.addPotentialBoundary(dcur);
+
+                iprev = icur;
+                dprev = dcur;
+            }
+        }
+        catch (IOException e)
+        {
+            logger.error("Failed to recalculate boundaries for {}; mmap access may degrade
to buffered for this file", descriptor);
+        }
+    }
+
     /**
      * Overrides the default behaviour to create segments of a maximum size.
      */
-    static class Builder extends SegmentedFile.Builder
+    public static class Builder extends SegmentedFile.Builder
     {
-        // planned segment boundaries
-        private List<Long> boundaries;
+        @VisibleForTesting
+        public static class Boundaries
+        {
+            private long[] boundaries;
+
+            // number of boundaries we have "fixed" (i.e. have determined the final value
of)
+            private int fixedCount;
+
+            public Boundaries()
+            {
+                // we always have a boundary of zero, so we start with a fixedCount of 1
+                this(new long[8], 1);
+            }
+
+            public Boundaries(long[] boundaries, int fixedCount)
+            {
+                init(boundaries, fixedCount);
+            }
+
+            void init(long[] boundaries, int fixedCount)
+            {
+                this.boundaries = boundaries;
+                this.fixedCount = fixedCount;
+            }
+
+            public void addCandidate(long candidate)
+            {
+                // we make sure we have room before adding another element, so that we can
share the addCandidate logic statically
+                boundaries = ensureCapacity(boundaries, fixedCount);
+                fixedCount = addCandidate(boundaries, fixedCount, candidate);
+            }
+
+            private static int addCandidate(long[] boundaries, int fixedCount, long candidate)
+            {
+                // check how far we are from the last fixed boundary
+                long delta = candidate - boundaries[fixedCount - 1];
+                assert delta >= 0;
+                if (delta != 0)
+                {
+                    if (delta <= MAX_SEGMENT_SIZE)
+                        // overwrite the unfixed (potential) boundary if the resultant segment
would still be mmappable
+                        boundaries[fixedCount] = candidate;
+                    else if (boundaries[fixedCount] == 0)
+                        // or, if it is not initialised, we cannot make an mmapped segment
here, so this is the fixed boundary
+                        boundaries[fixedCount++] = candidate;
+                    else
+                        // otherwise, fix the prior boundary and initialise our unfixed boundary
+                        boundaries[++fixedCount] = candidate;
+                }
+                return fixedCount;
+            }
+
+            // ensures there is room for another fixed boundary AND an unfixed candidate
boundary, i.e. fixedCount + 2 items
+            private static long[] ensureCapacity(long[] boundaries, int fixedCount)
+            {
+                if (fixedCount + 1 >= boundaries.length)
+                    return Arrays.copyOf(boundaries, boundaries.length * 2);
+                return boundaries;
+            }
+
+            void clear()
+            {
+                fixedCount = 1;
+                Arrays.fill(boundaries, 0);
+            }
+
+            // returns the fixed boundaries, truncated to a correctly sized long[]
+            public long[] truncate()
+            {
+                return Arrays.copyOf(boundaries, fixedCount);
+            }
 
-        // offset of the open segment (first segment begins at 0).
-        private long currentStart = 0;
+            // returns the finished boundaries for the provided length, truncated to a correctly
sized long[]
+            public long[] finish(long length, boolean isFinal)
+            {
+                assert length > 0;
+                // ensure there's room for the length to be added
+                boundaries = ensureCapacity(boundaries, fixedCount);
+
+                // clone our current contents, so we don't corrupt them
+                int fixedCount = this.fixedCount;
+                long[] boundaries = this.boundaries.clone();
+
+                // if we're finishing early, our length may be before some of our boundaries,
+                // so walk backwards until our boundaries are <= length
+                while (boundaries[fixedCount - 1] >= length)
+                    boundaries[fixedCount--] = 0;
+                if (boundaries[fixedCount] >= length)
+                    boundaries[fixedCount] = 0;
+
+                // add our length as a boundary
+                fixedCount = addCandidate(boundaries, fixedCount, length);
+
+                // if we have any unfixed boundary at the end, it's now fixed, since we're
done
+                if (boundaries[fixedCount] != 0)
+                    fixedCount++;
+
+                boundaries = Arrays.copyOf(boundaries, fixedCount);
+                if (isFinal)
+                {
+                    // if this is the final one, save it
+                    this.boundaries = boundaries;
+                    this.fixedCount = fixedCount;
+                }
+                return boundaries;
+            }
+        }
 
-        // current length of the open segment.
-        // used to allow merging multiple too-large-to-mmap segments, into a single buffered
segment.
-        private long currentSize = 0;
+        private final Boundaries boundaries = new Boundaries();
 
         public Builder()
         {
             super();
-            boundaries = new ArrayList<>();
-            boundaries.add(0L);
         }
 
-        public void addPotentialBoundary(long boundary)
+        public long[] boundaries()
         {
-            if (boundary - currentStart <= MAX_SEGMENT_SIZE)
-            {
-                // boundary fits into current segment: expand it
-                currentSize = boundary - currentStart;
-                return;
-            }
+            return boundaries.truncate();
+        }
 
-            // close the current segment to try and make room for the boundary
-            if (currentSize > 0)
-            {
-                currentStart += currentSize;
-                boundaries.add(currentStart);
-            }
-            currentSize = boundary - currentStart;
+        // indicates if we may need to repair the mmapped file boundaries. this is a cheap
check to see if there
+        // are any spans larger than an mmap segment size, which should be rare to occur
in practice.
+        boolean mayNeedRepair(String path)
+        {
+            // old boundaries were created without the length, so add it as a candidate
+            long length = new File(path).length();
+            boundaries.addCandidate(length);
+            long[] boundaries = this.boundaries.truncate();
 
-            // if we couldn't make room, the boundary needs its own segment
-            if (currentSize > MAX_SEGMENT_SIZE)
+            long prev = 0;
+            for (long boundary : boundaries)
             {
-                currentStart = boundary;
-                boundaries.add(currentStart);
-                currentSize = 0;
+                if (boundary - prev > MAX_SEGMENT_SIZE)
+                    return true;
+                prev = boundary;
             }
+            return false;
+        }
+
+        public void addPotentialBoundary(long boundary)
+        {
+            boundaries.addCandidate(boundary);
         }
 
         public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
@@ -188,10 +362,10 @@ public class MmappedSegmentedFile extends SegmentedFile
             assert !isFinal || overrideLength <= 0;
             long length = overrideLength > 0 ? overrideLength : new File(path).length();
             // create the segments
-            return new MmappedSegmentedFile(path, length, createSegments(path, length));
+            return new MmappedSegmentedFile(path, length, createSegments(path, length, isFinal));
         }
 
-        private Segment[] createSegments(String path, long length)
+        private Segment[] createSegments(String path, long length, boolean isFinal)
         {
             RandomAccessFile raf;
             try
@@ -203,27 +377,17 @@ public class MmappedSegmentedFile extends SegmentedFile
                 throw new RuntimeException(e);
             }
 
-            // if we're early finishing a range that doesn't span multiple segments, but
the finished file now does,
-            // we remove these from the end (we loop incase somehow this spans multiple segments,
but that would
-            // be a loco dataset
-            while (length < boundaries.get(boundaries.size() - 1))
-                boundaries.remove(boundaries.size() -1);
-
-            // add a sentinel value == length
-            List<Long> boundaries = new ArrayList<>(this.boundaries);
-            if (length != boundaries.get(boundaries.size() - 1))
-                boundaries.add(length);
-
+            long[] boundaries = this.boundaries.finish(length, isFinal);
 
-            int segcount = boundaries.size() - 1;
+            int segcount = boundaries.length - 1;
             Segment[] segments = new Segment[segcount];
 
             try
             {
                 for (int i = 0; i < segcount; i++)
                 {
-                    long start = boundaries.get(i);
-                    long size = boundaries.get(i + 1) - start;
+                    long start = boundaries[i];
+                    long size = boundaries[i + 1] - start;
                     MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
                                                ? raf.getChannel().map(FileChannel.MapMode.READ_ONLY,
start, size)
                                                : null;
@@ -245,9 +409,10 @@ public class MmappedSegmentedFile extends SegmentedFile
         public void serializeBounds(DataOutput out) throws IOException
         {
             super.serializeBounds(out);
-            out.writeInt(boundaries.size());
-            for (long position: boundaries)
-                out.writeLong(position);
+            long[] boundaries = this.boundaries.truncate();
+            out.writeInt(boundaries.length);
+            for (long boundary : boundaries)
+                out.writeLong(boundary);
         }
 
         @Override
@@ -256,12 +421,11 @@ public class MmappedSegmentedFile extends SegmentedFile
             super.deserializeBounds(in);
 
             int size = in.readInt();
-            List<Long> temp = new ArrayList<>(size);
-            
+            long[] boundaries = new long[size];
             for (int i = 0; i < size; i++)
-                temp.add(in.readLong());
+                boundaries[i] = in.readLong();
 
-            boundaries = temp;
+            this.boundaries.init(boundaries, size);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index c65ecbf..23454bc 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 
 import com.google.common.util.concurrent.RateLimiter;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
----------------------------------------------------------------------
diff --git a/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
new file mode 100644
index 0000000..e17c6a7
--- /dev/null
+++ b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
@@ -0,0 +1,322 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import com.google.common.io.Files;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.io.util.DataOutputStreamPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
+import org.apache.cassandra.io.util.MmappedSegmentedFile.Builder.Boundaries;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class LongSegmentedFileBoundaryTest
+{
+    @BeforeClass
+    public static void setup() throws Exception
+    {
+        SchemaLoader.cleanupAndLeaveDirs();
+        Keyspace.setInitialized();
+        StorageService.instance.initServer();
+    }
+
+    @AfterClass
+    public static void tearDown()
+    {
+        Config.setClientMode(false);
+    }
+
+    @Test
+    public void testRandomBoundaries()
+    {
+        long[] candidates = new long[1 + (1 << 16)];
+        int[] indexesToCheck = new int[1 << 8];
+        Random random = new Random();
+
+        for (int run = 0; run < 100; run++)
+        {
+
+            long seed = random.nextLong();
+            random.setSeed(seed);
+            System.out.println("Seed: " + seed);
+
+            // at least 1Ki, and as many as 256Ki, boundaries
+            int candidateCount = (1 + random.nextInt(candidates.length >> 10)) <<
10;
+            generateBoundaries(random, candidateCount, candidates, indexesToCheck);
+
+            Boundaries builder = new Boundaries();
+            int nextIndexToCheck = indexesToCheck[0];
+            int checkCount = 0;
+            System.out.printf("[0..%d)", candidateCount);
+            for (int i = 1; i < candidateCount - 1; i++)
+            {
+                if (i == nextIndexToCheck)
+                {
+                    if (checkCount % 20 == 0)
+                        System.out.printf(" %d", i);
+                    // grow number of samples logarithmically; work will still increase superlinearly,
as size of dataset grows linearly
+                    int sampleCount = 1 << (31 - Integer.numberOfLeadingZeros(++checkCount));
+                    checkBoundarySample(random, candidates, i, sampleCount, builder);
+                    // select out next index to check (there may be dups, so skip them)
+                    while ((nextIndexToCheck = checkCount == indexesToCheck.length ? candidateCount
: indexesToCheck[checkCount]) == i)
+                        checkCount++;
+                }
+
+                builder.addCandidate(candidates[i]);
+            }
+            System.out.println();
+            checkBoundaries(candidates, candidateCount - 1, builder, candidates[candidateCount
- 1]);
+            Assert.assertEquals(candidateCount, nextIndexToCheck);
+        }
+    }
+
+    private static void generateBoundaries(Random random, int candidateCount, long[] candidates,
int[] indexesToCheck)
+    {
+        // average averageBoundarySize is 4MiB, max 4GiB, min 4KiB
+        long averageBoundarySize = (4L << 10) * random.nextInt(1 << 20);
+        long prev = 0;
+        for (int i = 1 ; i < candidateCount ; i++)
+            candidates[i] = prev += Math.max(1, averageBoundarySize + (random.nextGaussian()
* averageBoundarySize));
+
+        // generate indexes we will corroborate our behaviour on
+        for (int i = 0 ; i < indexesToCheck.length ; i++)
+            indexesToCheck[i] = 1 + random.nextInt(candidateCount - 2);
+        Arrays.sort(indexesToCheck);
+    }
+
+    private static void checkBoundarySample(Random random, long[] candidates, int candidateCount,
int sampleCount, Boundaries builder)
+    {
+        for (int i = 0 ; i < sampleCount ; i++)
+        {
+            // pick a number exponentially less likely to be near the beginning, since we
test that area earlier
+            int position = 0 ;
+            while (position <= 0)
+                position = candidateCount / (Integer.lowestOneBit(random.nextInt()));
+            long upperBound = candidates[position];
+            long lowerBound = random.nextBoolean() ? (rand(random, 0, upperBound) / (Integer.lowestOneBit(random.nextInt())))
+                                                   : candidates[Math.max(0, position - random.nextInt(64))];
+            long length = rand(random, lowerBound, upperBound);
+            checkBoundaries(candidates, candidateCount, builder, length);
+        }
+        checkBoundaries(candidates, candidateCount, builder, candidates[candidateCount]);
+    }
+
+    private static long rand(Random random, long lowerBound, long upperBound)
+    {
+        if (upperBound == lowerBound)
+            return upperBound;
+        return lowerBound + ((random.nextLong() & Long.MAX_VALUE) % (upperBound - lowerBound));
+    }
+
+    private static void checkBoundaries(long[] candidates, int candidateCount, Boundaries
builder, long length)
+    {
+        if (length == 0)
+            return;
+
+        long[] boundaries = new long[(int) (10 + 2 * (length / Integer.MAX_VALUE))];
+        int count = 1;
+        int prev = 0;
+        while (true)
+        {
+            int p = candidates[prev + 1] - boundaries[count - 1] >= Integer.MAX_VALUE
+                    ? prev + 1
+                    : Arrays.binarySearch(candidates, prev, candidateCount, boundaries[count
- 1] + Integer.MAX_VALUE);
+            if (p < 0) p = -2 -p;
+            if (p >= candidateCount - 1 || candidates[p] >= length)
+                break;
+            boundaries[count++] = candidates[p];
+            if (candidates[p + 1] >= length)
+                break;
+            prev = p;
+        }
+        if (candidates[candidateCount - 1] < length && length - boundaries[count
- 1] >= Integer.MAX_VALUE)
+            boundaries[count++] = candidates[candidateCount - 1];
+        boundaries[count++] = length;
+        final long[] canon = Arrays.copyOf(boundaries, count);
+        final long[] check = builder.finish(length, false);
+        if (!Arrays.equals(canon, check))
+            Assert.assertTrue("\n" + Arrays.toString(canon) + "\n" + Arrays.toString(check),
Arrays.equals(canon, check));
+    }
+
+    @Test
+    public void testBoundariesAndRepairSmall() throws InvalidRequestException, IOException
+    {
+        testBoundariesAndRepair(1, 1 << 16);
+    }
+
+    @Test
+    public void testBoundariesAndRepairMedium() throws InvalidRequestException, IOException
+    {
+        testBoundariesAndRepair(1, 1 << 20);
+    }
+
+    @Test
+    public void testBoundariesAndRepairLarge() throws InvalidRequestException, IOException
+    {
+        testBoundariesAndRepair(1, 100 << 20);
+    }
+
+    @Test
+    public void testBoundariesAndRepairHuge() throws InvalidRequestException, IOException
+    {
+        testBoundariesAndRepair(1, Integer.MAX_VALUE - 1024);
+    }
+
+    @Test
+    public void testBoundariesAndRepairTooHuge() throws InvalidRequestException, IOException
+    {
+        testBoundariesAndRepair(1, Integer.MAX_VALUE);
+    }
+
+    @Test
+    public void testBoundariesAndRepairHugeIndex() throws InvalidRequestException, IOException
+    {
+        testBoundariesAndRepair(1 << 7, 1 << 15);
+    }
+
+    @Test
+    public void testBoundariesAndRepairReallyHugeIndex() throws InvalidRequestException,
IOException
+    {
+        testBoundariesAndRepair(1 << 14, 1 << 15);
+    }
+
+    private void testBoundariesAndRepair(int rows, int rowSize) throws InvalidRequestException,
IOException
+    {
+        String KS = "cql_keyspace";
+        String TABLE = "table1";
+
+        File tempdir = Files.createTempDir();
+        try
+        {
+            Assert.assertTrue(DatabaseDescriptor.getColumnIndexSize() < rowSize);
+            Assert.assertTrue(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap);
+            Assert.assertTrue(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
+            Assert.assertTrue(StorageService.getPartitioner() instanceof ByteOrderedPartitioner);
+            File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
+            Assert.assertTrue(dataDir.mkdirs());
+
+            String schema = "CREATE TABLE cql_keyspace.table" + (rows > 1 ? "2" : "1")
+ " (k bigint, v1 blob, v2 blob, v3 blob, v4 blob, v5 blob, PRIMARY KEY (k" + (rows > 1
? ", v1" : "") + ")) WITH compression = { 'sstable_compression':'' };";
+            String insert = "INSERT INTO cql_keyspace.table" + (rows > 1 ? "2" : "1")
+ " (k, v1, v2, v3, v4, v5) VALUES (?, ?, ?, ?, ?, ?)";
+
+            CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
+                                                      .inDirectory(dataDir)
+                                                      .forTable(schema)
+                                                      .withPartitioner(StorageService.getPartitioner())
+                                                      .using(insert)
+                                                      .sorted();
+            CQLSSTableWriter writer = builder.build();
+
+            // write 8Gb of decorated keys
+            ByteBuffer[] value = new ByteBuffer[rows];
+            for (int row = 0 ; row < rows ; row++)
+            {
+                // if we're using clustering columns, the clustering key is replicated across
every other column
+                value[row] = ByteBuffer.allocate(rowSize / (rows > 1 ? 8 : 5));
+                value[row].putInt(0, row);
+            }
+            long targetSize = 8L << 30;
+            long dk = 0;
+            long size = 0;
+            long dkSize = rowSize * rows;
+            while (size < targetSize)
+            {
+                for (int row = 0 ; row < rows ; row++)
+                    writer.addRow(dk, value[row], value[row], value[row], value[row], value[row]);
+                size += dkSize;
+                dk++;
+            }
+
+            Descriptor descriptor = writer.getCurrentDescriptor().asType(Descriptor.Type.FINAL);
+            writer.close();
+
+            // open (and close) the reader so that the summary file is created
+            SSTableReader reader = SSTableReader.open(descriptor);
+            reader.selfRef().release();
+
+            // then check the boundaries are reasonable, and corrupt them
+            checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
+
+            // then check that reopening corrects the corruption
+            reader = SSTableReader.open(descriptor);
+            reader.selfRef().release();
+            checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
+        }
+        finally
+        {
+            FileUtils.deleteRecursive(tempdir);
+        }
+    }
+
+    private static void checkThenCorruptBoundaries(Descriptor descriptor, boolean expectDataMmappable)
throws IOException
+    {
+        File summaryFile = new File(descriptor.filenameFor(Component.SUMMARY));
+        DataInputStream iStream = new DataInputStream(new FileInputStream(summaryFile));
+        IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream, StorageService.getPartitioner(),
true, CFMetaData.DEFAULT_MIN_INDEX_INTERVAL, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL);
+        ByteBuffer first = ByteBufferUtil.readWithLength(iStream);
+        ByteBuffer last = ByteBufferUtil.readWithLength(iStream);
+        MmappedSegmentedFile.Builder ibuilder = new MmappedSegmentedFile.Builder();
+        MmappedSegmentedFile.Builder dbuilder = new MmappedSegmentedFile.Builder();
+        ibuilder.deserializeBounds(iStream);
+        dbuilder.deserializeBounds(iStream);
+        iStream.close();
+        // index file cannot generally be non-mmappable, as index entries cannot be larger
than MAX_SEGMENT_SIZE (due to promotedSize being encoded as an int)
+        assertBoundaries(descriptor.filenameFor(Component.PRIMARY_INDEX), true, ibuilder.boundaries());
+        assertBoundaries(descriptor.filenameFor(Component.DATA), expectDataMmappable, dbuilder.boundaries());
+
+        DataOutputStreamPlus oStream = new DataOutputStreamPlus(new FileOutputStream(summaryFile));
+        IndexSummary.serializer.serialize(indexSummary, oStream, true);
+        ByteBufferUtil.writeWithLength(first, oStream);
+        ByteBufferUtil.writeWithLength(last, oStream);
+        oStream.writeInt(1);
+        oStream.writeLong(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length());
+        oStream.writeLong(new File(descriptor.filenameFor(Component.DATA)).length());
+        oStream.close();
+    }
+
+    private static void assertBoundaries(String path, boolean expectMmappable, long[] boundaries)
+    {
+        long length = new File(path).length();
+        long prev = boundaries[0];
+        for (int i = 1 ; i <= boundaries.length && prev < length ; i++)
+        {
+            long boundary = i == boundaries.length ? length : boundaries[i];
+            Assert.assertEquals(String.format("[%d, %d), %d of %d", boundary, prev, i, boundaries.length),
+                                expectMmappable, boundary - prev <= Integer.MAX_VALUE);
+            prev = boundary;
+        }
+    }
+
+}


Mime
View raw message