Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 10E8E1823B for ; Wed, 30 Sep 2015 18:55:04 +0000 (UTC) Received: (qmail 16670 invoked by uid 500); 30 Sep 2015 18:48:20 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 16516 invoked by uid 500); 30 Sep 2015 18:48:20 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 16465 invoked by uid 99); 30 Sep 2015 18:48:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Sep 2015 18:48:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C39E0DF9AA; Wed, 30 Sep 2015 18:48:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Wed, 30 Sep 2015 18:48:20 -0000 Message-Id: <26fd6cdbea504732b1c611c676ce3fbc@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/16] cassandra git commit: Fix Mmapped File Boundaries Fix Mmapped File Boundaries This patch fixes two bugs with mmap segment boundary tracking, and introduces automated correction of this bug on startup patch by benedict; reviewed by tjake for CASSANDRA-10357 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c37562e3 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c37562e3 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c37562e3 Branch: refs/heads/cassandra-2.2 Commit: c37562e345c24720c55428a8644191df68319812 Parents: f6cab37 Author: Benedict Elliott Smith Authored: Wed Sep 16 18:09:32 2015 +0100 Committer: Benedict Elliott Smith Committed: Wed Sep 30 19:45:49 2015 +0100 ---------------------------------------------------------------------- .../io/sstable/AbstractSSTableSimpleWriter.java | 2 + .../cassandra/io/sstable/CQLSSTableWriter.java | 15 + .../cassandra/io/sstable/SSTableReader.java | 34 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 6 + .../io/sstable/SSTableSimpleWriter.java | 12 + .../cassandra/io/util/MappedFileDataInput.java | 8 +- .../cassandra/io/util/MmappedSegmentedFile.java | 270 +++++++++++++--- .../apache/cassandra/io/util/SegmentedFile.java | 1 + .../sstable/LongSegmentedFileBoundaryTest.java | 322 +++++++++++++++++++ 9 files changed, 601 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java index 165a4b2..557c3de 100644 --- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java @@ -200,4 +200,6 @@ public abstract class AbstractSSTableSimpleWriter implements Closeable protected abstract void writeRow(DecoratedKey key, ColumnFamily columnFamily) throws IOException; protected abstract ColumnFamily getColumnFamily() throws IOException; + + public abstract Descriptor getCurrentDescriptor(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index b211a90..c364171 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -271,6 +271,16 @@ public class CQLSSTableWriter implements Closeable writer.close(); } + public Descriptor getCurrentDescriptor() + { + return writer.getCurrentDescriptor(); + } + + public CFMetaData getCFMetaData() + { + return writer.metadata; + } + /** * A Builder for a CQLSSTableWriter object. */ @@ -366,6 +376,11 @@ public class CQLSSTableWriter implements Closeable } } + CFMetaData metadata() + { + return schema; + } + /** * Adds the specified column family to the specified keyspace. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 0f307b0..84add6f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -17,13 +17,7 @@ */ package org.apache.cassandra.io.sstable; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.*; @@ -70,20 +64,14 @@ import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.FSError; +import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.metadata.CompactionMetadata; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.sstable.metadata.StatsMetadata; import org.apache.cassandra.io.sstable.metadata.ValidationMetadata; -import org.apache.cassandra.io.util.BufferedSegmentedFile; -import org.apache.cassandra.io.util.CompressedSegmentedFile; -import org.apache.cassandra.io.util.DataOutputStreamAndChannel; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.io.util.ICompressedFile; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.io.util.SegmentedFile; +import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.ActiveRepairService; @@ -162,6 +150,7 @@ import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR public class SSTableReader extends SSTable implements SelfRefCounted { private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); + private static final int ACCURATE_BOUNDARIES_MAGIC_NUMBER = 248923458; private static final ScheduledThreadPoolExecutor syncExecutor = new ScheduledThreadPoolExecutor(1); static @@ -892,6 +881,19 @@ public class SSTableReader extends SSTable implements SelfRefCounted buffer.capacity()) + if (!contains(pos)) throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity())); seekInternal((int) inSegmentPos); } + public boolean contains(long pos) + { + long inSegmentPos = pos - segmentOffset; + return inSegmentPos >= 0 && inSegmentPos < buffer.capacity(); + } + public long getFilePointer() { return segmentOffset + (long)position; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 1b23343..623f65a 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -24,11 +24,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.io.FSReadError; -import org.apache.cassandra.io.sstable.SSTableWriter; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.IndexSummary; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.JVMStabilityInspector; public class MmappedSegmentedFile extends SegmentedFile @@ -135,52 +141,220 @@ public class MmappedSegmentedFile extends SegmentedFile } } + // see CASSANDRA-10357 + public static boolean maybeRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + boolean mayNeedRepair = false; + if (ibuilder instanceof Builder) + mayNeedRepair = ((Builder) ibuilder).mayNeedRepair(descriptor.filenameFor(Component.PRIMARY_INDEX)); + if (dbuilder instanceof Builder) + mayNeedRepair |= ((Builder) dbuilder).mayNeedRepair(descriptor.filenameFor(Component.DATA)); + + if (mayNeedRepair) + forceRepair(metadata, descriptor, indexSummary, ibuilder, dbuilder); + return mayNeedRepair; + } + + // if one of the index/data files have boundaries larger than we can mmap, and they were written by a version that did not guarantee correct boundaries were saved, + // rebuild the boundaries and save them again + private static void forceRepair(CFMetaData metadata, Descriptor descriptor, IndexSummary indexSummary, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + if (ibuilder instanceof Builder) + ((Builder) ibuilder).boundaries.clear(); + if (dbuilder instanceof Builder) + ((Builder) dbuilder).boundaries.clear(); + + try (RandomAccessFile raf = new RandomAccessFile(descriptor.filenameFor(Component.PRIMARY_INDEX), "r");) + { + long iprev = 0, dprev = 0; + for (int i = 0; i < indexSummary.size(); i++) + { + // first read the position in the summary, and read the corresponding position in the data file + long icur = indexSummary.getPosition(i); + raf.seek(icur); + ByteBufferUtil.readWithShortLength(raf); + RowIndexEntry rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf, descriptor.version); + long dcur = rie.position; + + // if these positions are small enough to map out a segment from the prior version (i.e. less than 2Gb), + // just add these as a boundary and proceed to the next index summary record; most scenarios will be + // served by this, keeping the cost of rebuild to a minimum. + + if (Math.max(icur - iprev , dcur - dprev) > MAX_SEGMENT_SIZE) + { + // otherwise, loop over its index block, providing each RIE as a potential boundary for both files + raf.seek(iprev); + while (raf.getFilePointer() < icur) + { + // add the position of this record in the index file as an index file boundary + ibuilder.addPotentialBoundary(raf.getFilePointer()); + // then read the RIE, and add its data file position as a boundary for the data file + ByteBufferUtil.readWithShortLength(raf); + rie = metadata.comparator.rowIndexEntrySerializer().deserialize(raf, descriptor.version); + dbuilder.addPotentialBoundary(rie.position); + } + } + + ibuilder.addPotentialBoundary(icur); + dbuilder.addPotentialBoundary(dcur); + + iprev = icur; + dprev = dcur; + } + } + catch (IOException e) + { + logger.error("Failed to recalculate boundaries for {}; mmap access may degrade to buffered for this file", descriptor); + } + } + /** * Overrides the default behaviour to create segments of a maximum size. */ - static class Builder extends SegmentedFile.Builder + public static class Builder extends SegmentedFile.Builder { - // planned segment boundaries - private List boundaries; + @VisibleForTesting + public static class Boundaries + { + private long[] boundaries; + + // number of boundaries we have "fixed" (i.e. have determined the final value of) + private int fixedCount; + + public Boundaries() + { + // we always have a boundary of zero, so we start with a fixedCount of 1 + this(new long[8], 1); + } + + public Boundaries(long[] boundaries, int fixedCount) + { + init(boundaries, fixedCount); + } + + void init(long[] boundaries, int fixedCount) + { + this.boundaries = boundaries; + this.fixedCount = fixedCount; + } + + public void addCandidate(long candidate) + { + // we make sure we have room before adding another element, so that we can share the addCandidate logic statically + boundaries = ensureCapacity(boundaries, fixedCount); + fixedCount = addCandidate(boundaries, fixedCount, candidate); + } + + private static int addCandidate(long[] boundaries, int fixedCount, long candidate) + { + // check how far we are from the last fixed boundary + long delta = candidate - boundaries[fixedCount - 1]; + assert delta >= 0; + if (delta != 0) + { + if (delta <= MAX_SEGMENT_SIZE) + // overwrite the unfixed (potential) boundary if the resultant segment would still be mmappable + boundaries[fixedCount] = candidate; + else if (boundaries[fixedCount] == 0) + // or, if it is not initialised, we cannot make an mmapped segment here, so this is the fixed boundary + boundaries[fixedCount++] = candidate; + else + // otherwise, fix the prior boundary and initialise our unfixed boundary + boundaries[++fixedCount] = candidate; + } + return fixedCount; + } + + // ensures there is room for another fixed boundary AND an unfixed candidate boundary, i.e. fixedCount + 2 items + private static long[] ensureCapacity(long[] boundaries, int fixedCount) + { + if (fixedCount + 1 >= boundaries.length) + return Arrays.copyOf(boundaries, boundaries.length * 2); + return boundaries; + } + + void clear() + { + fixedCount = 1; + Arrays.fill(boundaries, 0); + } + + // returns the fixed boundaries, truncated to a correctly sized long[] + public long[] truncate() + { + return Arrays.copyOf(boundaries, fixedCount); + } - // offset of the open segment (first segment begins at 0). - private long currentStart = 0; + // returns the finished boundaries for the provided length, truncated to a correctly sized long[] + public long[] finish(long length, boolean isFinal) + { + assert length > 0; + // ensure there's room for the length to be added + boundaries = ensureCapacity(boundaries, fixedCount); + + // clone our current contents, so we don't corrupt them + int fixedCount = this.fixedCount; + long[] boundaries = this.boundaries.clone(); + + // if we're finishing early, our length may be before some of our boundaries, + // so walk backwards until our boundaries are <= length + while (boundaries[fixedCount - 1] >= length) + boundaries[fixedCount--] = 0; + if (boundaries[fixedCount] >= length) + boundaries[fixedCount] = 0; + + // add our length as a boundary + fixedCount = addCandidate(boundaries, fixedCount, length); + + // if we have any unfixed boundary at the end, it's now fixed, since we're done + if (boundaries[fixedCount] != 0) + fixedCount++; + + boundaries = Arrays.copyOf(boundaries, fixedCount); + if (isFinal) + { + // if this is the final one, save it + this.boundaries = boundaries; + this.fixedCount = fixedCount; + } + return boundaries; + } + } - // current length of the open segment. - // used to allow merging multiple too-large-to-mmap segments, into a single buffered segment. - private long currentSize = 0; + private final Boundaries boundaries = new Boundaries(); public Builder() { super(); - boundaries = new ArrayList<>(); - boundaries.add(0L); } - public void addPotentialBoundary(long boundary) + public long[] boundaries() { - if (boundary - currentStart <= MAX_SEGMENT_SIZE) - { - // boundary fits into current segment: expand it - currentSize = boundary - currentStart; - return; - } + return boundaries.truncate(); + } - // close the current segment to try and make room for the boundary - if (currentSize > 0) - { - currentStart += currentSize; - boundaries.add(currentStart); - } - currentSize = boundary - currentStart; + // indicates if we may need to repair the mmapped file boundaries. this is a cheap check to see if there + // are any spans larger than an mmap segment size, which should be rare to occur in practice. + boolean mayNeedRepair(String path) + { + // old boundaries were created without the length, so add it as a candidate + long length = new File(path).length(); + boundaries.addCandidate(length); + long[] boundaries = this.boundaries.truncate(); - // if we couldn't make room, the boundary needs its own segment - if (currentSize > MAX_SEGMENT_SIZE) + long prev = 0; + for (long boundary : boundaries) { - currentStart = boundary; - boundaries.add(currentStart); - currentSize = 0; + if (boundary - prev > MAX_SEGMENT_SIZE) + return true; + prev = boundary; } + return false; + } + + public void addPotentialBoundary(long boundary) + { + boundaries.addCandidate(boundary); } public SegmentedFile complete(String path, long overrideLength, boolean isFinal) @@ -188,10 +362,10 @@ public class MmappedSegmentedFile extends SegmentedFile assert !isFinal || overrideLength <= 0; long length = overrideLength > 0 ? overrideLength : new File(path).length(); // create the segments - return new MmappedSegmentedFile(path, length, createSegments(path, length)); + return new MmappedSegmentedFile(path, length, createSegments(path, length, isFinal)); } - private Segment[] createSegments(String path, long length) + private Segment[] createSegments(String path, long length, boolean isFinal) { RandomAccessFile raf; try @@ -203,27 +377,17 @@ public class MmappedSegmentedFile extends SegmentedFile throw new RuntimeException(e); } - // if we're early finishing a range that doesn't span multiple segments, but the finished file now does, - // we remove these from the end (we loop incase somehow this spans multiple segments, but that would - // be a loco dataset - while (length < boundaries.get(boundaries.size() - 1)) - boundaries.remove(boundaries.size() -1); - - // add a sentinel value == length - List boundaries = new ArrayList<>(this.boundaries); - if (length != boundaries.get(boundaries.size() - 1)) - boundaries.add(length); - + long[] boundaries = this.boundaries.finish(length, isFinal); - int segcount = boundaries.size() - 1; + int segcount = boundaries.length - 1; Segment[] segments = new Segment[segcount]; try { for (int i = 0; i < segcount; i++) { - long start = boundaries.get(i); - long size = boundaries.get(i + 1) - start; + long start = boundaries[i]; + long size = boundaries[i + 1] - start; MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE ? raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, size) : null; @@ -245,9 +409,10 @@ public class MmappedSegmentedFile extends SegmentedFile public void serializeBounds(DataOutput out) throws IOException { super.serializeBounds(out); - out.writeInt(boundaries.size()); - for (long position: boundaries) - out.writeLong(position); + long[] boundaries = this.boundaries.truncate(); + out.writeInt(boundaries.length); + for (long boundary : boundaries) + out.writeLong(boundary); } @Override @@ -256,12 +421,11 @@ public class MmappedSegmentedFile extends SegmentedFile super.deserializeBounds(in); int size = in.readInt(); - List temp = new ArrayList<>(size); - + long[] boundaries = new long[size]; for (int i = 0; i < size; i++) - temp.add(in.readLong()); + boundaries[i] = in.readLong(); - boundaries = temp; + this.boundaries.init(boundaries, size); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/src/java/org/apache/cassandra/io/util/SegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java index c65ecbf..23454bc 100644 --- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.nio.MappedByteBuffer; import java.util.Iterator; +import java.util.List; import java.util.NoSuchElementException; import com.google.common.util.concurrent.RateLimiter; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c37562e3/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java new file mode 100644 index 0000000..e17c6a7 --- /dev/null +++ b/test/burn/org/apache/cassandra/io/sstable/LongSegmentedFileBoundaryTest.java @@ -0,0 +1,322 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.io.sstable; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Random; + +import com.google.common.io.Files; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.MmappedSegmentedFile; +import org.apache.cassandra.io.util.MmappedSegmentedFile.Builder.Boundaries; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class LongSegmentedFileBoundaryTest +{ + @BeforeClass + public static void setup() throws Exception + { + SchemaLoader.cleanupAndLeaveDirs(); + Keyspace.setInitialized(); + StorageService.instance.initServer(); + } + + @AfterClass + public static void tearDown() + { + Config.setClientMode(false); + } + + @Test + public void testRandomBoundaries() + { + long[] candidates = new long[1 + (1 << 16)]; + int[] indexesToCheck = new int[1 << 8]; + Random random = new Random(); + + for (int run = 0; run < 100; run++) + { + + long seed = random.nextLong(); + random.setSeed(seed); + System.out.println("Seed: " + seed); + + // at least 1Ki, and as many as 256Ki, boundaries + int candidateCount = (1 + random.nextInt(candidates.length >> 10)) << 10; + generateBoundaries(random, candidateCount, candidates, indexesToCheck); + + Boundaries builder = new Boundaries(); + int nextIndexToCheck = indexesToCheck[0]; + int checkCount = 0; + System.out.printf("[0..%d)", candidateCount); + for (int i = 1; i < candidateCount - 1; i++) + { + if (i == nextIndexToCheck) + { + if (checkCount % 20 == 0) + System.out.printf(" %d", i); + // grow number of samples logarithmically; work will still increase superlinearly, as size of dataset grows linearly + int sampleCount = 1 << (31 - Integer.numberOfLeadingZeros(++checkCount)); + checkBoundarySample(random, candidates, i, sampleCount, builder); + // select out next index to check (there may be dups, so skip them) + while ((nextIndexToCheck = checkCount == indexesToCheck.length ? candidateCount : indexesToCheck[checkCount]) == i) + checkCount++; + } + + builder.addCandidate(candidates[i]); + } + System.out.println(); + checkBoundaries(candidates, candidateCount - 1, builder, candidates[candidateCount - 1]); + Assert.assertEquals(candidateCount, nextIndexToCheck); + } + } + + private static void generateBoundaries(Random random, int candidateCount, long[] candidates, int[] indexesToCheck) + { + // average averageBoundarySize is 4MiB, max 4GiB, min 4KiB + long averageBoundarySize = (4L << 10) * random.nextInt(1 << 20); + long prev = 0; + for (int i = 1 ; i < candidateCount ; i++) + candidates[i] = prev += Math.max(1, averageBoundarySize + (random.nextGaussian() * averageBoundarySize)); + + // generate indexes we will corroborate our behaviour on + for (int i = 0 ; i < indexesToCheck.length ; i++) + indexesToCheck[i] = 1 + random.nextInt(candidateCount - 2); + Arrays.sort(indexesToCheck); + } + + private static void checkBoundarySample(Random random, long[] candidates, int candidateCount, int sampleCount, Boundaries builder) + { + for (int i = 0 ; i < sampleCount ; i++) + { + // pick a number exponentially less likely to be near the beginning, since we test that area earlier + int position = 0 ; + while (position <= 0) + position = candidateCount / (Integer.lowestOneBit(random.nextInt())); + long upperBound = candidates[position]; + long lowerBound = random.nextBoolean() ? (rand(random, 0, upperBound) / (Integer.lowestOneBit(random.nextInt()))) + : candidates[Math.max(0, position - random.nextInt(64))]; + long length = rand(random, lowerBound, upperBound); + checkBoundaries(candidates, candidateCount, builder, length); + } + checkBoundaries(candidates, candidateCount, builder, candidates[candidateCount]); + } + + private static long rand(Random random, long lowerBound, long upperBound) + { + if (upperBound == lowerBound) + return upperBound; + return lowerBound + ((random.nextLong() & Long.MAX_VALUE) % (upperBound - lowerBound)); + } + + private static void checkBoundaries(long[] candidates, int candidateCount, Boundaries builder, long length) + { + if (length == 0) + return; + + long[] boundaries = new long[(int) (10 + 2 * (length / Integer.MAX_VALUE))]; + int count = 1; + int prev = 0; + while (true) + { + int p = candidates[prev + 1] - boundaries[count - 1] >= Integer.MAX_VALUE + ? prev + 1 + : Arrays.binarySearch(candidates, prev, candidateCount, boundaries[count - 1] + Integer.MAX_VALUE); + if (p < 0) p = -2 -p; + if (p >= candidateCount - 1 || candidates[p] >= length) + break; + boundaries[count++] = candidates[p]; + if (candidates[p + 1] >= length) + break; + prev = p; + } + if (candidates[candidateCount - 1] < length && length - boundaries[count - 1] >= Integer.MAX_VALUE) + boundaries[count++] = candidates[candidateCount - 1]; + boundaries[count++] = length; + final long[] canon = Arrays.copyOf(boundaries, count); + final long[] check = builder.finish(length, false); + if (!Arrays.equals(canon, check)) + Assert.assertTrue("\n" + Arrays.toString(canon) + "\n" + Arrays.toString(check), Arrays.equals(canon, check)); + } + + @Test + public void testBoundariesAndRepairSmall() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, 1 << 16); + } + + @Test + public void testBoundariesAndRepairMedium() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, 1 << 20); + } + + @Test + public void testBoundariesAndRepairLarge() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, 100 << 20); + } + + @Test + public void testBoundariesAndRepairHuge() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, Integer.MAX_VALUE - 1024); + } + + @Test + public void testBoundariesAndRepairTooHuge() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1, Integer.MAX_VALUE); + } + + @Test + public void testBoundariesAndRepairHugeIndex() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1 << 7, 1 << 15); + } + + @Test + public void testBoundariesAndRepairReallyHugeIndex() throws InvalidRequestException, IOException + { + testBoundariesAndRepair(1 << 14, 1 << 15); + } + + private void testBoundariesAndRepair(int rows, int rowSize) throws InvalidRequestException, IOException + { + String KS = "cql_keyspace"; + String TABLE = "table1"; + + File tempdir = Files.createTempDir(); + try + { + Assert.assertTrue(DatabaseDescriptor.getColumnIndexSize() < rowSize); + Assert.assertTrue(DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap); + Assert.assertTrue(DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap); + Assert.assertTrue(StorageService.getPartitioner() instanceof ByteOrderedPartitioner); + File dataDir = new File(tempdir.getAbsolutePath() + File.separator + KS + File.separator + TABLE); + Assert.assertTrue(dataDir.mkdirs()); + + String schema = "CREATE TABLE cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k bigint, v1 blob, v2 blob, v3 blob, v4 blob, v5 blob, PRIMARY KEY (k" + (rows > 1 ? ", v1" : "") + ")) WITH compression = { 'sstable_compression':'' };"; + String insert = "INSERT INTO cql_keyspace.table" + (rows > 1 ? "2" : "1") + " (k, v1, v2, v3, v4, v5) VALUES (?, ?, ?, ?, ?, ?)"; + + CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder() + .inDirectory(dataDir) + .forTable(schema) + .withPartitioner(StorageService.getPartitioner()) + .using(insert) + .sorted(); + CQLSSTableWriter writer = builder.build(); + + // write 8Gb of decorated keys + ByteBuffer[] value = new ByteBuffer[rows]; + for (int row = 0 ; row < rows ; row++) + { + // if we're using clustering columns, the clustering key is replicated across every other column + value[row] = ByteBuffer.allocate(rowSize / (rows > 1 ? 8 : 5)); + value[row].putInt(0, row); + } + long targetSize = 8L << 30; + long dk = 0; + long size = 0; + long dkSize = rowSize * rows; + while (size < targetSize) + { + for (int row = 0 ; row < rows ; row++) + writer.addRow(dk, value[row], value[row], value[row], value[row], value[row]); + size += dkSize; + dk++; + } + + Descriptor descriptor = writer.getCurrentDescriptor().asType(Descriptor.Type.FINAL); + writer.close(); + + // open (and close) the reader so that the summary file is created + SSTableReader reader = SSTableReader.open(descriptor); + reader.selfRef().release(); + + // then check the boundaries are reasonable, and corrupt them + checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE); + + // then check that reopening corrects the corruption + reader = SSTableReader.open(descriptor); + reader.selfRef().release(); + checkThenCorruptBoundaries(descriptor, rows * rowSize < Integer.MAX_VALUE); + } + finally + { + FileUtils.deleteRecursive(tempdir); + } + } + + private static void checkThenCorruptBoundaries(Descriptor descriptor, boolean expectDataMmappable) throws IOException + { + File summaryFile = new File(descriptor.filenameFor(Component.SUMMARY)); + DataInputStream iStream = new DataInputStream(new FileInputStream(summaryFile)); + IndexSummary indexSummary = IndexSummary.serializer.deserialize(iStream, StorageService.getPartitioner(), true, CFMetaData.DEFAULT_MIN_INDEX_INTERVAL, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL); + ByteBuffer first = ByteBufferUtil.readWithLength(iStream); + ByteBuffer last = ByteBufferUtil.readWithLength(iStream); + MmappedSegmentedFile.Builder ibuilder = new MmappedSegmentedFile.Builder(); + MmappedSegmentedFile.Builder dbuilder = new MmappedSegmentedFile.Builder(); + ibuilder.deserializeBounds(iStream); + dbuilder.deserializeBounds(iStream); + iStream.close(); + // index file cannot generally be non-mmappable, as index entries cannot be larger than MAX_SEGMENT_SIZE (due to promotedSize being encoded as an int) + assertBoundaries(descriptor.filenameFor(Component.PRIMARY_INDEX), true, ibuilder.boundaries()); + assertBoundaries(descriptor.filenameFor(Component.DATA), expectDataMmappable, dbuilder.boundaries()); + + DataOutputStreamPlus oStream = new DataOutputStreamPlus(new FileOutputStream(summaryFile)); + IndexSummary.serializer.serialize(indexSummary, oStream, true); + ByteBufferUtil.writeWithLength(first, oStream); + ByteBufferUtil.writeWithLength(last, oStream); + oStream.writeInt(1); + oStream.writeLong(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)).length()); + oStream.writeLong(new File(descriptor.filenameFor(Component.DATA)).length()); + oStream.close(); + } + + private static void assertBoundaries(String path, boolean expectMmappable, long[] boundaries) + { + long length = new File(path).length(); + long prev = boundaries[0]; + for (int i = 1 ; i <= boundaries.length && prev < length ; i++) + { + long boundary = i == boundaries.length ? length : boundaries[i]; + Assert.assertEquals(String.format("[%d, %d), %d of %d", boundary, prev, i, boundaries.length), + expectMmappable, boundary - prev <= Integer.MAX_VALUE); + prev = boundary; + } + } + +}