cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [08/16] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2
Date Wed, 30 Sep 2015 18:48:26 GMT
http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
index bf926e9,0000000..2f00687
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
+++ b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java
@@@ -1,171 -1,0 +1,177 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.io.util;
 +
 +import java.io.*;
 +import java.nio.ByteBuffer;
 +
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +
 +public class ByteBufferDataInput extends AbstractDataInput implements FileDataInput, DataInput
 +{
 +    private final ByteBuffer buffer;
 +    private final String filename;
 +    private final long segmentOffset;
 +    private int position;
 +
 +    public ByteBufferDataInput(ByteBuffer buffer, String filename, long segmentOffset, int
position)
 +    {
 +        assert buffer != null;
 +        this.buffer = buffer;
 +        this.filename = filename;
 +        this.segmentOffset = segmentOffset;
 +        this.position = position;
 +    }
 +
 +    // Only use when we know the seek in within the mapped segment. Throws an
 +    // IOException otherwise.
 +    public void seek(long pos) throws IOException
 +    {
 +        long inSegmentPos = pos - segmentOffset;
-         if (inSegmentPos < 0 || inSegmentPos > buffer.capacity())
++        if (!contains(pos))
 +            throw new IOException(String.format("Seek position %d is not within mmap segment
(seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity()));
 +
 +        position = (int) inSegmentPos;
 +    }
 +
++    public boolean contains(long pos)
++    {
++        long inSegmentPos = pos - segmentOffset;
++        return inSegmentPos >= 0 && inSegmentPos < buffer.capacity();
++    }
++
 +    public long getFilePointer()
 +    {
 +        return segmentOffset + position;
 +    }
 +
 +    public long getPosition()
 +    {
 +        return segmentOffset + position;
 +    }
 +
 +    public long getPositionLimit()
 +    {
 +        return segmentOffset + buffer.capacity();
 +    }
 +
 +    @Override
 +    public boolean markSupported()
 +    {
 +        return false;
 +    }
 +
 +    public void reset(FileMark mark) throws IOException
 +    {
 +        assert mark instanceof MappedFileDataInputMark;
 +        position = ((MappedFileDataInputMark) mark).position;
 +    }
 +
 +    public FileMark mark()
 +    {
 +        return new MappedFileDataInputMark(position);
 +    }
 +
 +    public long bytesPastMark(FileMark mark)
 +    {
 +        assert mark instanceof MappedFileDataInputMark;
 +        assert position >= ((MappedFileDataInputMark) mark).position;
 +        return position - ((MappedFileDataInputMark) mark).position;
 +    }
 +
 +    public boolean isEOF() throws IOException
 +    {
 +        return position == buffer.capacity();
 +    }
 +
 +    public long bytesRemaining() throws IOException
 +    {
 +        return buffer.capacity() - position;
 +    }
 +
 +    public String getPath()
 +    {
 +        return filename;
 +    }
 +
 +    public int read() throws IOException
 +    {
 +        if (isEOF())
 +            return -1;
 +        return buffer.get(position++) & 0xFF;
 +    }
 +
 +    /**
 +     * Does the same thing as <code>readFully</code> do but without copying
data (thread safe)
 +     * @param length length of the bytes to read
 +     * @return buffer with portion of file content
 +     * @throws IOException on any fail of I/O operation
 +     */
 +    public ByteBuffer readBytes(int length) throws IOException
 +    {
 +        int remaining = buffer.remaining() - position;
 +        if (length > remaining)
 +            throw new IOException(String.format("mmap segment underflow; remaining is %d
but %d requested",
 +                                                remaining, length));
 +
 +        if (length == 0)
 +            return ByteBufferUtil.EMPTY_BYTE_BUFFER;
 +
 +        ByteBuffer bytes = buffer.duplicate();
 +        bytes.position(buffer.position() + position).limit(buffer.position() + position
+ length);
 +        position += length;
 +
 +        // we have to copy the data in case we unreference the underlying sstable.  See
CASSANDRA-3179
 +        ByteBuffer clone = ByteBuffer.allocate(bytes.remaining());
 +        clone.put(bytes);
 +        clone.flip();
 +        return clone;
 +    }
 +
 +    @Override
 +    public final void readFully(byte[] bytes) throws IOException
 +    {
 +        ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, 0, bytes.length);
 +        position += bytes.length;
 +    }
 +
 +    @Override
 +    public final void readFully(byte[] bytes, int offset, int count) throws IOException
 +    {
 +        ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, offset, count);
 +        position += count;
 +    }
 +
 +    private static class MappedFileDataInputMark implements FileMark
 +    {
 +        int position;
 +
 +        MappedFileDataInputMark(int position)
 +        {
 +            this.position = position;
 +        }
 +    }
 +
 +    @Override
 +    public String toString() {
 +        return getClass().getSimpleName() + "(" +
 +               "filename='" + filename + "'" +
 +               ", position=" + position +
 +               ")";
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index a8fae9f,623f65a..808b5ad
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@@ -27,6 -28,13 +28,12 @@@ import com.google.common.annotations.Vi
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.db.RowIndexEntry;
 -import org.apache.cassandra.io.FSReadError;
+ import org.apache.cassandra.io.sstable.Component;
+ import org.apache.cassandra.io.sstable.Descriptor;
+ import org.apache.cassandra.io.sstable.IndexSummary;
+ import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.JVMStabilityInspector;
  
  public class MmappedSegmentedFile extends SegmentedFile
@@@ -135,6 -141,73 +142,74 @@@
          }
      }
  
+     // see CASSANDRA-10357
+     public static boolean maybeRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary
indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+     {
+         boolean mayNeedRepair = false;
+         if (ibuilder instanceof Builder)
+             mayNeedRepair = ((Builder) ibuilder).mayNeedRepair(descriptor.filenameFor(Component.PRIMARY_INDEX));
+         if (dbuilder instanceof Builder)
+             mayNeedRepair |= ((Builder) dbuilder).mayNeedRepair(descriptor.filenameFor(Component.DATA));
+ 
+         if (mayNeedRepair)
+             forceRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder);
+         return mayNeedRepair;
+     }
+ 
+     // if one of the index/data files have boundaries larger than we can mmap, and they
were written by a version that did not guarantee correct boundaries were saved,
+     // rebuild the boundaries and save them again
+     private static void forceRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary
indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+     {
+         if (ibuilder instanceof Builder)
+             ((Builder) ibuilder).boundaries.clear();
+         if (dbuilder instanceof Builder)
+             ((Builder) dbuilder).boundaries.clear();
+ 
++        RowIndexEntry.IndexSerializer rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata);
+         try (RandomAccessFile raf = new RandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX),
"r");)
+         {
+             long iprev = 0, dprev = 0;
+             for (int i = 0; i < indexSummary.size(); i++)
+             {
+                 // first read the position in the summary, and read the corresponding position
in the data file
+                 long icur = indexSummary.getPosition(i);
+                 raf.seek(icur);
+                 ByteBufferUtil.readWithShortLength(raf);
 -                RowIndexEntry rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf,
descriptor.version);
++                RowIndexEntry rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version);
+                 long dcur = rie.position;
+ 
+                 // if these positions are small enough to map out a segment from the prior
version (i.e. less than 2Gb),
+                 // just add these as a boundary and proceed to the next index summary record;
most scenarios will be
+                 // served by this, keeping the cost of rebuild to a minimum.
+ 
+                 if (Math.max(icur - iprev , dcur - dprev) > MAX_SEGMENT_SIZE)
+                 {
+                     // otherwise, loop over its index block, providing each RIE as a potential
boundary for both files
+                     raf.seek(iprev);
+                     while (raf.getFilePointer() < icur)
+                     {
+                         // add the position of this record in the index file as an index
file boundary
+                         ibuilder.addPotentialBoundary(raf.getFilePointer());
+                         // then read the RIE, and add its data file position as a boundary
for the data file
+                         ByteBufferUtil.readWithShortLength(raf);
 -                        rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf,
descriptor.version);
++                        rie = rowIndexEntrySerializer.deserialize(raf, descriptor.version);
+                         dbuilder.addPotentialBoundary(rie.position);
+                     }
+                 }
+ 
+                 ibuilder.addPotentialBoundary(icur);
+                 dbuilder.addPotentialBoundary(dcur);
+ 
+                 iprev = icur;
+                 dprev = dcur;
+             }
+         }
+         catch (IOException e)
+         {
+             logger.error("Failed to recalculate boundaries for {}; mmap access may degrade
to buffered for this file", descriptor);
+         }
+     }
+ 
      /**
       * Overrides the default behaviour to create segments of a maximum size.
       */
@@@ -153,68 -326,83 +328,58 @@@
          public Builder()
          {
              super();
-             boundaries = new ArrayList<>();
-             boundaries.add(0L);
          }
  
-         public void addPotentialBoundary(long boundary)
+         public long[] boundaries()
          {
-             if (boundary - currentStart <= MAX_SEGMENT_SIZE)
-             {
-                 // boundary fits into current segment: expand it
-                 currentSize = boundary - currentStart;
-                 return;
-             }
+             return boundaries.truncate();
+         }
  
-             // close the current segment to try and make room for the boundary
-             if (currentSize > 0)
-             {
-                 currentStart += currentSize;
-                 boundaries.add(currentStart);
-             }
-             currentSize = boundary - currentStart;
+         // indicates if we may need to repair the mmapped file boundaries. this is a cheap
check to see if there
+         // are any spans larger than an mmap segment size, which should be rare to occur
in practice.
+         boolean mayNeedRepair(String path)
+         {
+             // old boundaries were created without the length, so add it as a candidate
+             long length = new File(path).length();
+             boundaries.addCandidate(length);
+             long[] boundaries = this.boundaries.truncate();
  
-             // if we couldn't make room, the boundary needs its own segment
-             if (currentSize > MAX_SEGMENT_SIZE)
+             long prev = 0;
+             for (long boundary : boundaries)
              {
-                 currentStart = boundary;
-                 boundaries.add(currentStart);
-                 currentSize = 0;
+                 if (boundary - prev > MAX_SEGMENT_SIZE)
+                     return true;
+                 prev = boundary;
              }
+             return false;
+         }
+ 
+         public void addPotentialBoundary(long boundary)
+         {
+             boundaries.addCandidate(boundary);
          }
  
 -        public SegmentedFile complete(String path, long overrideLength, boolean isFinal)
 +        public SegmentedFile complete(ChannelProxy channel, long overrideLength)
          {
 -            assert !isFinal || overrideLength <= 0;
 -            long length = overrideLength > 0 ? overrideLength : new File(path).length();
 +            long length = overrideLength > 0 ? overrideLength : channel.size();
              // create the segments
-             return new MmappedSegmentedFile(channel, length, createSegments(channel, length));
 -            return new MmappedSegmentedFile(path, length, createSegments(path, length, isFinal));
--        }
 -
 -        private Segment[] createSegments(String path, long length, boolean isFinal)
 -        {
 -            RandomAccessFile raf;
 -            try
 -            {
 -                raf = new RandomAccessFile(path, "r");
 -            }
 -            catch (IOException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
  
-         private Segment[] createSegments(ChannelProxy channel, long length)
-         {
-             // if we're early finishing a range that doesn't span multiple segments, but
the finished file now does,
-             // we remove these from the end (we loop incase somehow this spans multiple
segments, but that would
-             // be a loco dataset
-             while (length < boundaries.get(boundaries.size() - 1))
-                 boundaries.remove(boundaries.size() -1);
- 
-             // add a sentinel value == length
-             List<Long> boundaries = new ArrayList<>(this.boundaries);
-             if (length != boundaries.get(boundaries.size() - 1))
-                 boundaries.add(length);
- 
-             int segcount = boundaries.size() - 1;
 -            long[] boundaries = this.boundaries.finish(length, isFinal);
++            long[] boundaries = this.boundaries.finish(length, overrideLength <= 0);
+ 
+             int segcount = boundaries.length - 1;
              Segment[] segments = new Segment[segcount];
+ 
 -            try
 -            {
 -                for (int i = 0; i < segcount; i++)
 -                {
 -                    long start = boundaries[i];
 -                    long size = boundaries[i + 1] - start;
 -                    MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
 -                                               ? raf.getChannel().map(FileChannel.MapMode.READ_ONLY,
start, size)
 -                                               : null;
 -                    segments[i] = new Segment(start, segment);
 -                }
 -            }
 -            catch (IOException e)
 -            {
 -                throw new FSReadError(e, path);
 -            }
 -            finally
 +            for (int i = 0; i < segcount; i++)
              {
-                 long start = boundaries.get(i);
-                 long size = boundaries.get(i + 1) - start;
 -                FileUtils.closeQuietly(raf);
++                long start = boundaries[i];
++                long size = boundaries[i + 1] - start;
 +                MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
 +                                           ? channel.map(FileChannel.MapMode.READ_ONLY,
start, size)
 +                                           : null;
 +                segments[i] = new Segment(start, segment);
              }
--            return segments;
++
++            return new MmappedSegmentedFile(channel, length, segments);
          }
  
          @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/util/SegmentedFile.java
index 66898c6,23454bc..30707d8
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@@ -23,9 -23,9 +23,10 @@@ import java.io.File
  import java.io.IOException;
  import java.nio.MappedByteBuffer;
  import java.util.Iterator;
+ import java.util.List;
  import java.util.NoSuchElementException;
  
 +import com.google.common.base.Throwables;
  import com.google.common.util.concurrent.RateLimiter;
  
  import org.apache.cassandra.config.Config;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/25de92e3/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
----------------------------------------------------------------------
diff --cc test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
index 0000000,e17c6a7..4913b32
mode 000000,100644..100644
--- a/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
+++ b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java
@@@ -1,0 -1,322 +1,324 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.cassandra.io.sstable;
+ 
+ import java.io.*;
+ import java.nio.ByteBuffer;
+ import java.util.Arrays;
+ import java.util.Random;
+ 
+ import com.google.common.io.Files;
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import junit.framework.Assert;
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.config.Config;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.dht.ByteOrderedPartitioner;
+ import org.apache.cassandra.exceptions.InvalidRequestException;
++import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.util.DataOutputStreamPlus;
+ import org.apache.cassandra.io.util.FileUtils;
+ import org.apache.cassandra.io.util.MmappedSegmentedFile;
+ import org.apache.cassandra.io.util.MmappedSegmentedFile.Builder.Boundaries;
++import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ 
+ public class LongSegmentedFileBoundaryTest
+ {
+     @BeforeClass
+     public static void setup() throws Exception
+     {
+         SchemaLoader.cleanupAndLeaveDirs();
+         Keyspace.setInitialized();
+         StorageService.instance.initServer();
+     }
+ 
+     @AfterClass
+     public static void tearDown()
+     {
+         Config.setClientMode(false);
+     }
+ 
+     @Test
+     public void testRandomBoundaries()
+     {
+         long[] candidates = new long[1 + (1 << 16)];
+         int[] indexesToCheck = new int[1 << 8];
+         Random random = new Random();
+ 
+         for (int run = 0; run < 100; run++)
+         {
+ 
+             long seed = random.nextLong();
+             random.setSeed(seed);
+             System.out.println("Seed: " + seed);
+ 
+             // at least 1Ki, and as many as 256Ki, boundaries
+             int candidateCount = (1 + random.nextInt(candidates.length >> 10)) <<
10;
+             generateBoundaries(random, candidateCount, candidates, indexesToCheck);
+ 
+             Boundaries builder = new Boundaries();
+             int nextIndexToCheck = indexesToCheck[0];
+             int checkCount = 0;
+             System.out.printf("[0..%d)", candidateCount);
+             for (int i = 1; i < candidateCount - 1; i++)
+             {
+                 if (i == nextIndexToCheck)
+                 {
+                     if (checkCount % 20 == 0)
+                         System.out.printf(" %d", i);
+                     // grow number of samples logarithmically; work will still increase
superlinearly, as size of dataset grows linearly
+                     int sampleCount = 1 << (31 - Integer.numberOfLeadingZeros(++checkCount));
+                     checkBoundarySample(random, candidates, i, sampleCount, builder);
+                     // select out next index to check (there may be dups, so skip them)
+                     while ((nextIndexToCheck = checkCount == indexesToCheck.length ? candidateCount
: indexesToCheck[checkCount]) == i)
+                         checkCount++;
+                 }
+ 
+                 builder.addCandidate(candidates[i]);
+             }
+             System.out.println();
+             checkBoundaries(candidates, candidateCount - 1, builder, candidates[candidateCount
- 1]);
+             Assert.assertEquals(candidateCount, nextIndexToCheck);
+         }
+     }
+ 
+     private static void generateBoundaries(Random random, int candidateCount, long[] candidates,
int[] indexesToCheck)
+     {
+         // average averageBoundarySize is 4MiB, max 4GiB, min 4KiB
+         long averageBoundarySize = (4L << 10) * random.nextInt(1 << 20);
+         long prev = 0;
+         for (int i = 1 ; i < candidateCount ; i++)
+             candidates[i] = prev += Math.max(1, averageBoundarySize + (random.nextGaussian()
* averageBoundarySize));
+ 
+         // generate indexes we will corroborate our behaviour on
+         for (int i = 0 ; i < indexesToCheck.length ; i++)
+             indexesToCheck[i] = 1 + random.nextInt(candidateCount - 2);
+         Arrays.sort(indexesToCheck);
+     }
+ 
+     private static void checkBoundarySample(Random random, long[] candidates, int candidateCount,
int sampleCount, Boundaries builder)
+     {
+         for (int i = 0 ; i < sampleCount ; i++)
+         {
+             // pick a number exponentially less likely to be near the beginning, since we
test that area earlier
+             int position = 0 ;
+             while (position <= 0)
+                 position = candidateCount / (Integer.lowestOneBit(random.nextInt()));
+             long upperBound = candidates[position];
+             long lowerBound = random.nextBoolean() ? (rand(random, 0, upperBound) / (Integer.lowestOneBit(random.nextInt())))
+                                                    : candidates[Math.max(0, position - random.nextInt(64))];
+             long length = rand(random, lowerBound, upperBound);
+             checkBoundaries(candidates, candidateCount, builder, length);
+         }
+         checkBoundaries(candidates, candidateCount, builder, candidates[candidateCount]);
+     }
+ 
+     private static long rand(Random random, long lowerBound, long upperBound)
+     {
+         if (upperBound == lowerBound)
+             return upperBound;
+         return lowerBound + ((random.nextLong() & Long.MAX_VALUE) % (upperBound - lowerBound));
+     }
+ 
+     private static void checkBoundaries(long[] candidates, int candidateCount, Boundaries
builder, long length)
+     {
+         if (length == 0)
+             return;
+ 
+         long[] boundaries = new long[(int) (10 + 2 * (length / Integer.MAX_VALUE))];
+         int count = 1;
+         int prev = 0;
+         while (true)
+         {
+             int p = candidates[prev + 1] - boundaries[count - 1] >= Integer.MAX_VALUE
+                     ? prev + 1
+                     : Arrays.binarySearch(candidates, prev, candidateCount, boundaries[count
- 1] + Integer.MAX_VALUE);
+             if (p < 0) p = -2 -p;
+             if (p >= candidateCount - 1 || candidates[p] >= length)
+                 break;
+             boundaries[count++] = candidates[p];
+             if (candidates[p + 1] >= length)
+                 break;
+             prev = p;
+         }
+         if (candidates[candidateCount - 1] < length && length - boundaries[count
- 1] >= Integer.MAX_VALUE)
+             boundaries[count++] = candidates[candidateCount - 1];
+         boundaries[count++] = length;
+         final long[] canon = Arrays.copyOf(boundaries, count);
+         final long[] check = builder.finish(length, false);
+         if (!Arrays.equals(canon, check))
+             Assert.assertTrue("\n" + Arrays.toString(canon) + "\n" + Arrays.toString(check),
Arrays.equals(canon, check));
+     }
+ 
+     @Test
+     public void testBoundariesAndRepairSmall() throws InvalidRequestException, IOException
+     {
+         testBoundariesAndRepair(1, 1 << 16);
+     }
+ 
+     @Test
+     public void testBoundariesAndRepairMedium() throws InvalidRequestException, IOException
+     {
+         testBoundariesAndRepair(1, 1 << 20);
+     }
+ 
+     @Test
+     public void testBoundariesAndRepairLarge() throws InvalidRequestException, IOException
+     {
+         testBoundariesAndRepair(1, 100 << 20);
+     }
+ 
+     @Test
+     public void testBoundariesAndRepairHuge() throws InvalidRequestException, IOException
+     {
+         testBoundariesAndRepair(1, Integer.MAX_VALUE - 1024);
+     }
+ 
+     @Test
+     public void testBoundariesAndRepairTooHuge() throws InvalidRequestException, IOException
+     {
+         testBoundariesAndRepair(1, Integer.MAX_VALUE);
+     }
+ 
+     @Test
+     public void testBoundariesAndRepairHugeIndex() throws InvalidRequestException, IOException
+     {
+         testBoundariesAndRepair(1 << 7, 1 << 15);
+     }
+ 
+     @Test
+     public void testBoundariesAndRepairReallyHugeIndex() throws InvalidRequestException,
IOException
+     {
+         testBoundariesAndRepair(1 << 14, 1 << 15);
+     }
+ 
+     private void testBoundariesAndRepair(int rows, int rowSize) throws InvalidRequestException,
IOException
+     {
+         String KS = "cql_keyspace";
+         String TABLE = "table1";
+ 
+         File tempdir = Files.createTempDir();
+         try
+         {
+             Assert.assertTrue(DatabaseDescriptor.getColumnIndexSize() < rowSize);
+             Assert.assertTrue(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap);
+             Assert.assertTrue(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap);
+             Assert.assertTrue(StorageService.getPartitioner() instanceof ByteOrderedPartitioner);
+             File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator
+ TABLE);
+             Assert.assertTrue(dataDir.mkdirs());
+ 
+             String schema = "CREATE TABLE cql_keyspace.table" + (rows > 1 ? "2" : "1")
+ " (k bigint, v1 blob, v2 blob, v3 blob, v4 blob, v5 blob, PRIMARY KEY (k" + (rows > 1
? ", v1" : "") + ")) WITH compression = { 'sstable_compression':'' };";
+             String insert = "INSERT INTO cql_keyspace.table" + (rows > 1 ? "2" : "1")
+ " (k, v1, v2, v3, v4, v5) VALUES (?, ?, ?, ?, ?, ?)";
+ 
+             CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder()
+                                                       .inDirectory(dataDir)
+                                                       .forTable(schema)
+                                                       .withPartitioner(StorageService.getPartitioner())
+                                                       .using(insert)
+                                                       .sorted();
+             CQLSSTableWriter writer = builder.build();
+ 
+             // write 8Gb of decorated keys
+             ByteBuffer[] value = new ByteBuffer[rows];
+             for (int row = 0 ; row < rows ; row++)
+             {
+                 // if we're using clustering columns, the clustering key is replicated across
every other column
+                 value[row] = ByteBuffer.allocate(rowSize / (rows > 1 ? 8 : 5));
+                 value[row].putInt(0, row);
+             }
+             long targetSize = 8L << 30;
+             long dk = 0;
+             long size = 0;
+             long dkSize = rowSize * rows;
+             while (size < targetSize)
+             {
+                 for (int row = 0 ; row < rows ; row++)
+                     writer.addRow(dk, value[row], value[row], value[row], value[row], value[row]);
+                 size += dkSize;
+                 dk++;
+             }
+ 
+             Descriptor descriptor = writer.getCurrentDescriptor().asType(Descriptor.Type.FINAL);
+             writer.close();
+ 
+             // open (and close) the reader so that the summary file is created
+             SSTableReader reader = SSTableReader.open(descriptor);
+             reader.selfRef().release();
+ 
+             // then check the boundaries are reasonable, and corrupt them
+             checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
+ 
+             // then check that reopening corrects the corruption
+             reader = SSTableReader.open(descriptor);
+             reader.selfRef().release();
+             checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE);
+         }
+         finally
+         {
+             FileUtils.deleteRecursive(tempdir);
+         }
+     }
+ 
+     private static void checkThenCorruptBoundaries(Descriptor descriptor, boolean expectDataMmappable)
throws IOException
+     {
+         File summaryFile = new File(descriptor.filenameFor(Component.SUMMARY));
+         DataInputStream iStream = new DataInputStream(new FileInputStream(summaryFile));
+         IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream, StorageService.getPartitioner(),
true, CFMetaData.DEFAULT_MIN_INDEX_INTERVAL, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL);
+         ByteBuffer first = ByteBufferUtil.readWithLength(iStream);
+         ByteBuffer last = ByteBufferUtil.readWithLength(iStream);
+         MmappedSegmentedFile.Builder ibuilder = new MmappedSegmentedFile.Builder();
+         MmappedSegmentedFile.Builder dbuilder = new MmappedSegmentedFile.Builder();
+         ibuilder.deserializeBounds(iStream);
+         dbuilder.deserializeBounds(iStream);
+         iStream.close();
+         // index file cannot generally be non-mmappable, as index entries cannot be larger
than MAX_SEGMENT_SIZE (due to promotedSize being encoded as an int)
+         assertBoundaries(descriptor.filenameFor(Component.PRIMARY_INDEX), true, ibuilder.boundaries());
+         assertBoundaries(descriptor.filenameFor(Component.DATA), expectDataMmappable, dbuilder.boundaries());
+ 
 -        DataOutputStreamPlus oStream = new DataOutputStreamPlus(new FileOutputStream(summaryFile));
++        DataOutputStreamPlus oStream = new WrappedDataOutputStreamPlus(new FileOutputStream(summaryFile));
+         IndexSummary.serializer.serialize(indexSummary, oStream, true);
+         ByteBufferUtil.writeWithLength(first, oStream);
+         ByteBufferUtil.writeWithLength(last, oStream);
+         oStream.writeInt(1);
+         oStream.writeLong(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length());
+         oStream.writeLong(new File(descriptor.filenameFor(Component.DATA)).length());
+         oStream.close();
+     }
+ 
+     private static void assertBoundaries(String path, boolean expectMmappable, long[] boundaries)
+     {
+         long length = new File(path).length();
+         long prev = boundaries[0];
+         for (int i = 1 ; i <= boundaries.length && prev < length ; i++)
+         {
+             long boundary = i == boundaries.length ? length : boundaries[i];
+             Assert.assertEquals(String.format("[%d, %d), %d of %d", boundary, prev, i, boundaries.length),
+                                 expectMmappable, boundary - prev <= Integer.MAX_VALUE);
+             prev = boundary;
+         }
+     }
+ 
+ }


Mime
View raw message