Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 42E78179E4 for ; Fri, 4 Sep 2015 11:46:23 +0000 (UTC) Received: (qmail 46687 invoked by uid 500); 4 Sep 2015 11:46:22 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 46555 invoked by uid 500); 4 Sep 2015 11:46:22 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 46132 invoked by uid 99); 4 Sep 2015 11:46:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Sep 2015 11:46:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 82EFEE0F7D; Fri, 4 Sep 2015 11:46:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Fri, 04 Sep 2015 11:46:25 -0000 Message-Id: <2138d70ae14a4e95adf2ca766d73a963@git.apache.org> In-Reply-To: <2db66b1b693e411c9270bb001faa4e6b@git.apache.org> References: <2db66b1b693e411c9270bb001faa4e6b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [04/10] cassandra git commit: Faster sequential IO (CASSANDRA-8630) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/AbstractDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java b/src/java/org/apache/cassandra/io/util/AbstractDataInput.java deleted file mode 100644 index 29ce2c3..0000000 --- a/src/java/org/apache/cassandra/io/util/AbstractDataInput.java +++ /dev/null @@ -1,343 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.util; - -import java.io.*; - -public abstract class AbstractDataInput extends InputStream implements DataInputPlus -{ - public abstract void seek(long position) throws IOException; - public abstract long getPosition(); - public abstract long getPositionLimit(); - - public int skipBytes(int n) throws IOException - { - if (n <= 0) - return 0; - long oldPosition = getPosition(); - seek(Math.min(getPositionLimit(), oldPosition + n)); - long skipped = getPosition() - oldPosition; - assert skipped >= 0 && skipped <= n; - return (int) skipped; - } - - /** - * Reads a boolean from the current position in this file. Blocks until one - * byte has been read, the end of the file is reached or an exception is - * thrown. - * - * @return the next boolean value from this file. - * @throws java.io.EOFException - * if the end of this file is detected. - * @throws java.io.IOException - * if this file is closed or another I/O error occurs. - */ - public final boolean readBoolean() throws IOException { - int temp = this.read(); - if (temp < 0) { - throw new EOFException(); - } - return temp != 0; - } - - /** - * Reads an 8-bit byte from the current position in this file. Blocks until - * one byte has been read, the end of the file is reached or an exception is - * thrown. - * - * @return the next signed 8-bit byte value from this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public final byte readByte() throws IOException { - int temp = this.read(); - if (temp < 0) { - throw new EOFException(); - } - return (byte) temp; - } - - /** - * Reads a 16-bit character from the current position in this file. Blocks until - * two bytes have been read, the end of the file is reached or an exception is - * thrown. - * - * @return the next char value from this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public final char readChar() throws IOException { - int ch1 = this.read(); - int ch2 = this.read(); - if ((ch1 | ch2) < 0) - throw new EOFException(); - return (char)((ch1 << 8) + (ch2 << 0)); - } - - /** - * Reads a 64-bit double from the current position in this file. Blocks - * until eight bytes have been read, the end of the file is reached or an - * exception is thrown. - * - * @return the next double value from this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public final double readDouble() throws IOException { - return Double.longBitsToDouble(readLong()); - } - - /** - * Reads a 32-bit float from the current position in this file. Blocks - * until four bytes have been read, the end of the file is reached or an - * exception is thrown. - * - * @return the next float value from this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public final float readFloat() throws IOException { - return Float.intBitsToFloat(readInt()); - } - - /** - * Reads bytes from this file into {@code buffer}. Blocks until {@code - * buffer.length} number of bytes have been read, the end of the file is - * reached or an exception is thrown. - * - * @param buffer - * the buffer to read bytes into. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - * @throws NullPointerException - * if {@code buffer} is {@code null}. - */ - public void readFully(byte[] buffer) throws IOException - { - readFully(buffer, 0, buffer.length); - } - - /** - * Read bytes from this file into {@code buffer} starting at offset {@code - * offset}. This method blocks until {@code count} number of bytes have been - * read. - * - * @param buffer - * the buffer to read bytes into. - * @param offset - * the initial position in {@code buffer} to store the bytes read - * from this file. - * @param count - * the maximum number of bytes to store in {@code buffer}. - * @throws EOFException - * if the end of this file is detected. - * @throws IndexOutOfBoundsException - * if {@code offset < 0} or {@code count < 0}, or if {@code - * offset + count} is greater than the length of {@code buffer}. - * @throws IOException - * if this file is closed or another I/O error occurs. - * @throws NullPointerException - * if {@code buffer} is {@code null}. - */ - public void readFully(byte[] buffer, int offset, int count) throws IOException - { - if (buffer == null) { - throw new NullPointerException(); - } - // avoid int overflow - if (offset < 0 || offset > buffer.length || count < 0 - || count > buffer.length - offset) { - throw new IndexOutOfBoundsException(); - } - while (count > 0) { - int result = read(buffer, offset, count); - if (result < 0) { - throw new EOFException(); - } - offset += result; - count -= result; - } - } - - /** - * Reads a 32-bit integer from the current position in this file. Blocks - * until four bytes have been read, the end of the file is reached or an - * exception is thrown. - * - * @return the next int value from this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public int readInt() throws IOException { - int ch1 = this.read(); - int ch2 = this.read(); - int ch3 = this.read(); - int ch4 = this.read(); - if ((ch1 | ch2 | ch3 | ch4) < 0) - throw new EOFException(); - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); - } - - /** - * Reads a line of text form the current position in this file. A line is - * represented by zero or more characters followed by {@code '\n'}, {@code - * '\r'}, {@code "\r\n"} or the end of file marker. The string does not - * include the line terminating sequence. - *

- * Blocks until a line terminating sequence has been read, the end of the - * file is reached or an exception is thrown. - * - * @return the contents of the line or {@code null} if no characters have - * been read before the end of the file has been reached. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public final String readLine() throws IOException { - StringBuilder line = new StringBuilder(80); // Typical line length - boolean foundTerminator = false; - long unreadPosition = -1; - while (true) { - int nextByte = read(); - switch (nextByte) { - case -1: - return line.length() != 0 ? line.toString() : null; - case (byte) '\r': - if (foundTerminator) { - seek(unreadPosition); - return line.toString(); - } - foundTerminator = true; - /* Have to be able to peek ahead one byte */ - unreadPosition = getPosition(); - break; - case (byte) '\n': - return line.toString(); - default: - if (foundTerminator) { - seek(unreadPosition); - return line.toString(); - } - line.append((char) nextByte); - } - } - } - - /** - * Reads a 64-bit long from the current position in this file. Blocks until - * eight bytes have been read, the end of the file is reached or an - * exception is thrown. - * - * @return the next long value from this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public long readLong() throws IOException { - return ((long)(readInt()) << 32) + (readInt() & 0xFFFFFFFFL); - } - - /** - * Reads a 16-bit short from the current position in this file. Blocks until - * two bytes have been read, the end of the file is reached or an exception - * is thrown. - * - * @return the next short value from this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public short readShort() throws IOException { - int ch1 = this.read(); - int ch2 = this.read(); - if ((ch1 | ch2) < 0) - throw new EOFException(); - return (short)((ch1 << 8) + (ch2 << 0)); - } - - /** - * Reads an unsigned 8-bit byte from the current position in this file and - * returns it as an integer. Blocks until one byte has been read, the end of - * the file is reached or an exception is thrown. - * - * @return the next unsigned byte value from this file as an int. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public final int readUnsignedByte() throws IOException { - int temp = this.read(); - if (temp < 0) { - throw new EOFException(); - } - return temp; - } - - /** - * Reads an unsigned 16-bit short from the current position in this file and - * returns it as an integer. Blocks until two bytes have been read, the end of - * the file is reached or an exception is thrown. - * - * @return the next unsigned short value from this file as an int. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - */ - public int readUnsignedShort() throws IOException { - int ch1 = this.read(); - int ch2 = this.read(); - if ((ch1 | ch2) < 0) - throw new EOFException(); - return (ch1 << 8) + (ch2 << 0); - } - - /** - * Reads a string that is encoded in {@link java.io.DataInput modified UTF-8} from - * this file. The number of bytes that must be read for the complete string - * is determined by the first two bytes read from the file. Blocks until all - * required bytes have been read, the end of the file is reached or an - * exception is thrown. - * - * @return the next string encoded in {@link java.io.DataInput modified UTF-8} from - * this file. - * @throws EOFException - * if the end of this file is detected. - * @throws IOException - * if this file is closed or another I/O error occurs. - * @throws java.io.UTFDataFormatException - * if the bytes read cannot be decoded into a character string. - */ - public final String readUTF() throws IOException { - return DataInputStream.readUTF(this); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java index 744e828..090c5bd 100644 --- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java @@ -29,25 +29,8 @@ public class BufferedSegmentedFile extends SegmentedFile super(copy); } - private static class Cleanup extends SegmentedFile.Cleanup - { - protected Cleanup(ChannelProxy channel) - { - super(channel); - } - public void tidy() - { - super.tidy(); - } - } - public static class Builder extends SegmentedFile.Builder { - public void addPotentialBoundary(long boundary) - { - // only one segment in a standard-io file - } - public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength) { long length = overrideLength > 0 ? overrideLength : channel.size(); @@ -55,13 +38,6 @@ public class BufferedSegmentedFile extends SegmentedFile } } - public FileDataInput getSegment(long position) - { - RandomAccessReader reader = RandomAccessReader.open(channel, bufferSize, -1L); - reader.seek(position); - return reader; - } - public BufferedSegmentedFile sharedCopy() { return new BufferedSegmentedFile(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java b/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java deleted file mode 100644 index bf926e9..0000000 --- a/src/java/org/apache/cassandra/io/util/ByteBufferDataInput.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.util; - -import java.io.*; -import java.nio.ByteBuffer; - -import org.apache.cassandra.utils.ByteBufferUtil; - -public class ByteBufferDataInput extends AbstractDataInput implements FileDataInput, DataInput -{ - private final ByteBuffer buffer; - private final String filename; - private final long segmentOffset; - private int position; - - public ByteBufferDataInput(ByteBuffer buffer, String filename, long segmentOffset, int position) - { - assert buffer != null; - this.buffer = buffer; - this.filename = filename; - this.segmentOffset = segmentOffset; - this.position = position; - } - - // Only use when we know the seek in within the mapped segment. Throws an - // IOException otherwise. - public void seek(long pos) throws IOException - { - long inSegmentPos = pos - segmentOffset; - if (inSegmentPos < 0 || inSegmentPos > buffer.capacity()) - throw new IOException(String.format("Seek position %d is not within mmap segment (seg offs: %d, length: %d)", pos, segmentOffset, buffer.capacity())); - - position = (int) inSegmentPos; - } - - public long getFilePointer() - { - return segmentOffset + position; - } - - public long getPosition() - { - return segmentOffset + position; - } - - public long getPositionLimit() - { - return segmentOffset + buffer.capacity(); - } - - @Override - public boolean markSupported() - { - return false; - } - - public void reset(FileMark mark) throws IOException - { - assert mark instanceof MappedFileDataInputMark; - position = ((MappedFileDataInputMark) mark).position; - } - - public FileMark mark() - { - return new MappedFileDataInputMark(position); - } - - public long bytesPastMark(FileMark mark) - { - assert mark instanceof MappedFileDataInputMark; - assert position >= ((MappedFileDataInputMark) mark).position; - return position - ((MappedFileDataInputMark) mark).position; - } - - public boolean isEOF() throws IOException - { - return position == buffer.capacity(); - } - - public long bytesRemaining() throws IOException - { - return buffer.capacity() - position; - } - - public String getPath() - { - return filename; - } - - public int read() throws IOException - { - if (isEOF()) - return -1; - return buffer.get(position++) & 0xFF; - } - - /** - * Does the same thing as readFully do but without copying data (thread safe) - * @param length length of the bytes to read - * @return buffer with portion of file content - * @throws IOException on any fail of I/O operation - */ - public ByteBuffer readBytes(int length) throws IOException - { - int remaining = buffer.remaining() - position; - if (length > remaining) - throw new IOException(String.format("mmap segment underflow; remaining is %d but %d requested", - remaining, length)); - - if (length == 0) - return ByteBufferUtil.EMPTY_BYTE_BUFFER; - - ByteBuffer bytes = buffer.duplicate(); - bytes.position(buffer.position() + position).limit(buffer.position() + position + length); - position += length; - - // we have to copy the data in case we unreference the underlying sstable. See CASSANDRA-3179 - ByteBuffer clone = ByteBuffer.allocate(bytes.remaining()); - clone.put(bytes); - clone.flip(); - return clone; - } - - @Override - public final void readFully(byte[] bytes) throws IOException - { - ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, 0, bytes.length); - position += bytes.length; - } - - @Override - public final void readFully(byte[] bytes, int offset, int count) throws IOException - { - ByteBufferUtil.arrayCopy(buffer, buffer.position() + position, bytes, offset, count); - position += count; - } - - private static class MappedFileDataInputMark implements FileMark - { - int position; - - MappedFileDataInputMark(int position) - { - this.position = position; - } - } - - @Override - public String toString() { - return getClass().getSimpleName() + "(" + - "filename='" + filename + "'" + - ", position=" + position + - ")"; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ChannelProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChannelProxy.java b/src/java/org/apache/cassandra/io/util/ChannelProxy.java index 79954a5..f866160 100644 --- a/src/java/org/apache/cassandra/io/util/ChannelProxy.java +++ b/src/java/org/apache/cassandra/io/util/ChannelProxy.java @@ -63,7 +63,7 @@ public final class ChannelProxy extends SharedCloseableImpl public ChannelProxy(File file) { - this(file.getAbsolutePath(), openChannel(file)); + this(file.getPath(), openChannel(file)); } public ChannelProxy(String filePath, FileChannel channel) @@ -87,7 +87,7 @@ public final class ChannelProxy extends SharedCloseableImpl final String filePath; final FileChannel channel; - protected Cleanup(String filePath, FileChannel channel) + Cleanup(String filePath, FileChannel channel) { this.filePath = filePath; this.channel = channel; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java index 98869a1..30f1e0c 100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java @@ -23,43 +23,33 @@ import java.util.zip.CRC32; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Throwables; public class ChecksummedRandomAccessReader extends RandomAccessReader { @SuppressWarnings("serial") public static class CorruptFileException extends RuntimeException { - public final File file; + public final String filePath; - public CorruptFileException(Exception cause, File file) + public CorruptFileException(Exception cause, String filePath) { super(cause); - this.file = file; + this.filePath = filePath; } } private final DataIntegrityMetadata.ChecksumValidator validator; - private final File file; - protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) + private ChecksummedRandomAccessReader(Builder builder) { - super(channel, validator.chunkSize, -1, BufferType.ON_HEAP); - this.validator = validator; - this.file = file; + super(builder); + this.validator = builder.validator; } @SuppressWarnings("resource") - public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException - { - ChannelProxy channel = new ChannelProxy(file); - RandomAccessReader crcReader = RandomAccessReader.open(crcFile); - DataIntegrityMetadata.ChecksumValidator validator = - new DataIntegrityMetadata.ChecksumValidator(new CRC32(), crcReader, file.getPath()); - return new ChecksummedRandomAccessReader(file, channel, validator); - } - @Override - protected void reBuffer() + protected void reBufferStandard() { long desiredPosition = current(); // align with buffer size, as checksums were computed in chunks of buffer size each. @@ -84,13 +74,19 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader } catch (IOException e) { - throw new CorruptFileException(e, file); + throw new CorruptFileException(e, channel.filePath()); } buffer.position((int) (desiredPosition - bufferOffset)); } @Override + protected void reBufferMmap() + { + throw new AssertionError("Unsupported operation"); + } + + @Override public void seek(long newPosition) { validator.seek(newPosition); @@ -100,14 +96,32 @@ public class ChecksummedRandomAccessReader extends RandomAccessReader @Override public void close() { - try + Throwables.perform(channel.filePath(), Throwables.FileOpType.READ, + super::close, + validator::close, + channel::close); + } + + public static final class Builder extends RandomAccessReader.Builder + { + private final DataIntegrityMetadata.ChecksumValidator validator; + + @SuppressWarnings("resource") + public Builder(File file, File crcFile) throws IOException { - super.close(); + super(new ChannelProxy(file)); + this.validator = new DataIntegrityMetadata.ChecksumValidator(new CRC32(), + RandomAccessReader.open(crcFile), + file.getPath()); + + super.bufferSize(validator.chunkSize) + .bufferType(BufferType.ON_HEAP); } - finally + + @Override + public RandomAccessReader build() { - channel.close(); - validator.close(); + return new ChecksummedRandomAccessReader(this); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java index 95c61c1..16f791a 100644 --- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java @@ -17,44 +17,49 @@ */ package org.apache.cassandra.io.util; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.util.TreeMap; - import com.google.common.util.concurrent.RateLimiter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.compress.CompressedRandomAccessReader; import org.apache.cassandra.io.compress.CompressedSequentialWriter; -import org.apache.cassandra.io.compress.CompressedThrottledReader; import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.concurrent.Ref; public class CompressedSegmentedFile extends SegmentedFile implements ICompressedFile { - public final CompressionMetadata metadata; + private static final Logger logger = LoggerFactory.getLogger(CompressedSegmentedFile.class); private static final boolean useMmap = DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap; - private static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE; - private final TreeMap chunkSegments; + + public final CompressionMetadata metadata; + private final MmappedRegions regions; public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata) { - this(channel, bufferSize, metadata, createMappedSegments(channel, metadata)); + this(channel, + bufferSize, + metadata, + useMmap + ? MmappedRegions.map(channel, metadata) + : null); } - public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata, TreeMap chunkSegments) + public CompressedSegmentedFile(ChannelProxy channel, int bufferSize, CompressionMetadata metadata, MmappedRegions regions) { - super(new Cleanup(channel, metadata, chunkSegments), channel, bufferSize, metadata.dataLength, metadata.compressedFileLength); + super(new Cleanup(channel, metadata, regions), channel, bufferSize, metadata.dataLength, metadata.compressedFileLength); this.metadata = metadata; - this.chunkSegments = chunkSegments; + this.regions = regions; } private CompressedSegmentedFile(CompressedSegmentedFile copy) { super(copy); this.metadata = copy.metadata; - this.chunkSegments = copy.chunkSegments; + this.regions = copy.regions; } public ChannelProxy channel() @@ -62,60 +67,36 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse return channel; } - public TreeMap chunkSegments() + public MmappedRegions regions() { - return chunkSegments; - } - - static TreeMap createMappedSegments(ChannelProxy channel, CompressionMetadata metadata) - { - if (!useMmap) - return null; - TreeMap chunkSegments = new TreeMap<>(); - long offset = 0; - long lastSegmentOffset = 0; - long segmentSize = 0; - - while (offset < metadata.dataLength) - { - CompressionMetadata.Chunk chunk = metadata.chunkFor(offset); - - //Reached a new mmap boundary - if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE) - { - chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); - lastSegmentOffset += segmentSize; - segmentSize = 0; - } - - segmentSize += chunk.length + 4; //checksum - offset += metadata.chunkLength(); - } - - if (segmentSize > 0) - chunkSegments.put(lastSegmentOffset, channel.map(FileChannel.MapMode.READ_ONLY, lastSegmentOffset, segmentSize)); - return chunkSegments; + return regions; } private static final class Cleanup extends SegmentedFile.Cleanup { final CompressionMetadata metadata; - final TreeMap chunkSegments; - protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, TreeMap chunkSegments) + private final MmappedRegions regions; + + protected Cleanup(ChannelProxy channel, CompressionMetadata metadata, MmappedRegions regions) { super(channel); this.metadata = metadata; - this.chunkSegments = chunkSegments; + this.regions = regions; } public void tidy() { - super.tidy(); - metadata.close(); - if (chunkSegments != null) + Throwable err = regions == null ? null : regions.close(null); + if (err != null) { - for (MappedByteBuffer segment : chunkSegments.values()) - FileUtils.clean(segment); + JVMStabilityInspector.inspectThrowable(err); + + // This is not supposed to happen + logger.error("Error while closing mmapped regions", err); } + + metadata.close(); + + super.tidy(); } } @@ -132,17 +113,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse public static class Builder extends SegmentedFile.Builder { - protected final CompressedSequentialWriter writer; + final CompressedSequentialWriter writer; public Builder(CompressedSequentialWriter writer) { this.writer = writer; } - public void addPotentialBoundary(long boundary) - { - // only one segment in a standard-io file - } - protected CompressionMetadata metadata(String path, long overrideLength) { if (writer == null) @@ -166,12 +142,12 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse public RandomAccessReader createReader() { - return CompressedRandomAccessReader.open(this); + return new CompressedRandomAccessReader.Builder(this).build(); } - public RandomAccessReader createThrottledReader(RateLimiter limiter) + public RandomAccessReader createReader(RateLimiter limiter) { - return CompressedThrottledReader.open(this, limiter); + return new CompressedRandomAccessReader.Builder(this).limiter(limiter).build(); } public CompressionMetadata getMetadata() http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/DataInputBuffer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java index 63091d0..a68dcc2 100644 --- a/src/java/org/apache/cassandra/io/util/DataInputBuffer.java +++ b/src/java/org/apache/cassandra/io/util/DataInputBuffer.java @@ -21,13 +21,10 @@ import java.io.IOException; import java.nio.ByteBuffer; /** - * Input stream around a fixed ByteBuffer. Necessary to have this derived class to avoid NIODataInputStream's - * shuffling of bytes behavior in readNext() - * + * Input stream around a single ByteBuffer. */ -public class DataInputBuffer extends NIODataInputStream +public class DataInputBuffer extends RebufferingInputStream { - private static ByteBuffer slice(byte[] buffer, int offset, int length) { ByteBuffer buf = ByteBuffer.wrap(buffer); @@ -41,13 +38,12 @@ public class DataInputBuffer extends NIODataInputStream } /** - * - * @param buf + * @param buffer * @param duplicate Whether or not to duplicate the buffer to ensure thread safety */ - public DataInputBuffer(ByteBuffer buf, boolean duplicate) + public DataInputBuffer(ByteBuffer buffer, boolean duplicate) { - super(buf, duplicate); + super(duplicate ? buffer.duplicate() : buffer); } public DataInputBuffer(byte[] buffer, int offset, int length) @@ -61,8 +57,14 @@ public class DataInputBuffer extends NIODataInputStream } @Override - protected int readNext() throws IOException + protected void reBuffer() throws IOException + { + //nope, we don't rebuffer, we are done! + } + + @Override + public int available() throws IOException { - return -1; + return buffer.remaining(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 70cd860..01c0049 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -34,6 +34,7 @@ import com.google.common.base.Charsets; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.utils.Throwables; public class DataIntegrityMetadata { @@ -138,14 +139,8 @@ public class DataIntegrityMetadata public void close() { - try - { - this.digestReader.close(); - } - finally - { - this.dataReader.close(); - } + Throwables.perform(digestReader::close, + dataReader::close); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/FileDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileDataInput.java b/src/java/org/apache/cassandra/io/util/FileDataInput.java index b63b750..f56193b 100644 --- a/src/java/org/apache/cassandra/io/util/FileDataInput.java +++ b/src/java/org/apache/cassandra/io/util/FileDataInput.java @@ -19,31 +19,22 @@ package org.apache.cassandra.io.util; import java.io.Closeable; import java.io.IOException; -import java.nio.ByteBuffer; public interface FileDataInput extends DataInputPlus, Closeable { - public String getPath(); + String getPath(); - public boolean isEOF() throws IOException; + boolean isEOF() throws IOException; - public long bytesRemaining() throws IOException; + long bytesRemaining() throws IOException; - public void seek(long pos) throws IOException; + void seek(long pos) throws IOException; - public FileMark mark(); + FileMark mark(); - public void reset(FileMark mark) throws IOException; + void reset(FileMark mark) throws IOException; - public long bytesPastMark(FileMark mark); + long bytesPastMark(FileMark mark); - public long getFilePointer(); - - /** - * Read length bytes from current file position - * @param length length of the bytes to read - * @return buffer with bytes read - * @throws IOException if any I/O operation failed - */ - public ByteBuffer readBytes(int length) throws IOException; + long getFilePointer(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java new file mode 100644 index 0000000..425c7d6 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/FileSegmentInputStream.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; + +/** + * This is the same as DataInputBuffer, i.e. a stream for a fixed byte buffer, + * except that we also implement FileDataInput by using an offset and a file path. + */ +public class FileSegmentInputStream extends DataInputBuffer implements FileDataInput +{ + private final String filePath; + private final long offset; + + public FileSegmentInputStream(ByteBuffer buffer, String filePath, long offset) + { + super(buffer, false); + this.filePath = filePath; + this.offset = offset; + } + + public String getPath() + { + return filePath; + } + + private long size() + { + return offset + buffer.capacity(); + } + + public boolean isEOF() + { + return !buffer.hasRemaining(); + } + + public long bytesRemaining() + { + return buffer.remaining(); + } + + public void seek(long pos) + { + if (pos < 0 || pos > size()) + throw new IllegalArgumentException(String.format("Unable to seek to position %d in %s (%d bytes) in partial mode", + pos, + getPath(), + size())); + + + buffer.position((int) (pos - offset)); + } + + @Override + public boolean markSupported() + { + return false; + } + + public FileMark mark() + { + throw new UnsupportedOperationException(); + } + + public void reset(FileMark mark) + { + throw new UnsupportedOperationException(); + } + + public long bytesPastMark(FileMark mark) + { + return 0; + } + + public long getFilePointer() + { + return offset + buffer.position(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/ICompressedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ICompressedFile.java b/src/java/org/apache/cassandra/io/util/ICompressedFile.java index ce7b22c..43d37dc 100644 --- a/src/java/org/apache/cassandra/io/util/ICompressedFile.java +++ b/src/java/org/apache/cassandra/io/util/ICompressedFile.java @@ -17,14 +17,11 @@ */ package org.apache.cassandra.io.util; -import java.nio.MappedByteBuffer; -import java.util.TreeMap; - import org.apache.cassandra.io.compress.CompressionMetadata; public interface ICompressedFile { - public ChannelProxy channel(); - public CompressionMetadata getMetadata(); - public TreeMap chunkSegments(); + ChannelProxy channel(); + CompressionMetadata getMetadata(); + MmappedRegions regions(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/MemoryInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java index 45261e0..e009528 100644 --- a/src/java/org/apache/cassandra/io/util/MemoryInputStream.java +++ b/src/java/org/apache/cassandra/io/util/MemoryInputStream.java @@ -19,50 +19,58 @@ package org.apache.cassandra.io.util; import java.io.DataInput; import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; -public class MemoryInputStream extends AbstractDataInput implements DataInput +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; + +import org.apache.cassandra.utils.memory.MemoryUtil; + +public class MemoryInputStream extends RebufferingInputStream implements DataInput { private final Memory mem; - private int position = 0; + private final int bufferSize; + private long offset; - public MemoryInputStream(Memory mem) - { - this.mem = mem; - } - public int read() throws IOException + public MemoryInputStream(Memory mem) { - return mem.getByte(position++) & 0xFF; + this(mem, Ints.saturatedCast(mem.size)); } - public void readFully(byte[] buffer, int offset, int count) throws IOException + @VisibleForTesting + public MemoryInputStream(Memory mem, int bufferSize) { - mem.getBytes(position, buffer, offset, count); - position += count; + super(getByteBuffer(mem.peer, bufferSize)); + this.mem = mem; + this.bufferSize = bufferSize; + this.offset = mem.peer + bufferSize; } - public void seek(long pos) + @Override + protected void reBuffer() throws IOException { - position = (int) pos; - } + if (offset - mem.peer >= mem.size()) + return; - public long getPosition() - { - return position; + buffer = getByteBuffer(offset, Math.min(bufferSize, Ints.saturatedCast(memRemaining()))); + offset += buffer.capacity(); } - public long getPositionLimit() + @Override + public int available() { - return mem.size(); + return Ints.saturatedCast(buffer.remaining() + memRemaining()); } - protected long length() + private long memRemaining() { - return mem.size(); + return mem.size + mem.peer - offset; } - public void close() + private static ByteBuffer getByteBuffer(long offset, int length) { - // do nothing. + return MemoryUtil.getByteBuffer(offset, length).order(ByteOrder.BIG_ENDIAN); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/MmappedRegions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedRegions.java b/src/java/org/apache/cassandra/io/util/MmappedRegions.java new file mode 100644 index 0000000..8f6cd92 --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/MmappedRegions.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.util.Arrays; + +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.utils.Throwables; +import org.apache.cassandra.utils.concurrent.RefCounted; +import org.apache.cassandra.utils.concurrent.SharedCloseableImpl; + +import static java.util.stream.Stream.of; +import static org.apache.cassandra.utils.Throwables.perform; + +public class MmappedRegions extends SharedCloseableImpl +{ + /** In a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size */ + public static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE; + + /** When we need to grow the arrays, we add this number of region slots */ + static final int REGION_ALLOC_SIZE = 15; + + /** The original state, which is shared with the tidier and + * contains all the regions mapped so far. It also + * does the actual mapping. */ + private final State state; + + /** A copy of the latest state. We update this each time the original state is + * updated and we share this with copies. If we are a copy, then this + * is null. Copies can only access existing regions, they cannot create + * new ones. This is for thread safety and because MmappedRegions is + * reference counted, only the original state will be cleaned-up, + * therefore only the original state should create new mapped regions. + */ + private volatile State copy; + + private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length) + { + this(new State(channel), metadata, length); + } + + private MmappedRegions(State state, CompressionMetadata metadata, long length) + { + super(new Tidier(state)); + + this.state = state; + + if (metadata != null) + { + assert length == 0 : "expected no length with metadata"; + updateState(metadata); + } + else if (length > 0) + { + updateState(length); + } + + this.copy = new State(state); + } + + private MmappedRegions(MmappedRegions original) + { + super(original); + this.state = original.copy; + } + + public static MmappedRegions empty(ChannelProxy channel) + { + return new MmappedRegions(channel, null, 0); + } + + public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metadata) + { + if (metadata == null) + throw new IllegalArgumentException("metadata cannot be null"); + + return new MmappedRegions(channel, metadata, 0); + } + + public static MmappedRegions map(ChannelProxy channel, long length) + { + if (length <= 0) + throw new IllegalArgumentException("Length must be positive"); + + return new MmappedRegions(channel, null, length); + } + + /** + * @return a snapshot of the memory mapped regions. The snapshot can + * only use existing regions, it cannot create new ones. + */ + public MmappedRegions sharedCopy() + { + return new MmappedRegions(this); + } + + private boolean isCopy() + { + return copy == null; + } + + public void extend(long length) + { + if (length < 0) + throw new IllegalArgumentException("Length must not be negative"); + + assert !isCopy() : "Copies cannot be extended"; + + if (length <= state.length) + return; + + updateState(length); + copy = new State(state); + } + + private void updateState(long length) + { + state.length = length; + long pos = state.getPosition(); + while (pos < length) + { + long size = Math.min(MAX_SEGMENT_SIZE, length - pos); + state.add(pos, size); + pos += size; + } + } + + private void updateState(CompressionMetadata metadata) + { + long offset = 0; + long lastSegmentOffset = 0; + long segmentSize = 0; + + while (offset < metadata.dataLength) + { + CompressionMetadata.Chunk chunk = metadata.chunkFor(offset); + + //Reached a new mmap boundary + if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE) + { + if (segmentSize > 0) + { + state.add(lastSegmentOffset, segmentSize); + lastSegmentOffset += segmentSize; + segmentSize = 0; + } + } + + segmentSize += chunk.length + 4; //checksum + offset += metadata.chunkLength(); + } + + if (segmentSize > 0) + state.add(lastSegmentOffset, segmentSize); + + state.length = lastSegmentOffset + segmentSize; + } + + public boolean isValid(ChannelProxy channel) + { + return state.isValid(channel); + } + + public boolean isEmpty() + { + return state.isEmpty(); + } + + public Region floor(long position) + { + assert !isCleanedUp() : "Attempted to use closed region"; + return state.floor(position); + } + + public static final class Region + { + public final long offset; + public final ByteBuffer buffer; + + public Region(long offset, ByteBuffer buffer) + { + this.offset = offset; + this.buffer = buffer; + } + + public long bottom() + { + return offset; + } + + public long top() + { + return offset + buffer.capacity(); + } + } + + private static final class State + { + /** The file channel */ + private final ChannelProxy channel; + + /** An array of region buffers, synchronized with offsets */ + private ByteBuffer[] buffers; + + /** An array of region offsets, synchronized with buffers */ + private long[] offsets; + + /** The maximum file length we have mapped */ + private long length; + + /** The index to the last region added */ + private int last; + + private State(ChannelProxy channel) + { + this.channel = channel.sharedCopy(); + this.buffers = new ByteBuffer[REGION_ALLOC_SIZE]; + this.offsets = new long[REGION_ALLOC_SIZE]; + this.length = 0; + this.last = -1; + } + + private State(State original) + { + this.channel = original.channel; + this.buffers = original.buffers; + this.offsets = original.offsets; + this.length = original.length; + this.last = original.last; + } + + private boolean isEmpty() + { + return last < 0; + } + + private boolean isValid(ChannelProxy channel) + { + return this.channel.filePath().equals(channel.filePath()); + } + + private Region floor(long position) + { + assert 0 <= position && position < length : String.format("%d >= %d", position, length); + + int idx = Arrays.binarySearch(offsets, 0, last +1, position); + assert idx != -1 : String.format("Bad position %d for regions %s, last %d in %s", position, Arrays.toString(offsets), last, channel); + if (idx < 0) + idx = -(idx + 2); // round down to entry at insertion point + + return new Region(offsets[idx], buffers[idx]); + } + + private long getPosition() + { + return last < 0 ? 0 : offsets[last] + buffers[last].capacity(); + } + + private void add(long pos, long size) + { + ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, pos, size); + + ++last; + + if (last == offsets.length) + { + offsets = Arrays.copyOf(offsets, offsets.length + REGION_ALLOC_SIZE); + buffers = Arrays.copyOf(buffers, buffers.length + REGION_ALLOC_SIZE); + } + + offsets[last] = pos; + buffers[last] = buffer; + } + + private Throwable close(Throwable accumulate) + { + accumulate = channel.close(accumulate); + + /* + * Try forcing the unmapping of segments using undocumented unsafe sun APIs. + * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping. + * If this works and a thread tries to access any segment, hell will unleash on earth. + */ + if (!FileUtils.isCleanerAvailable()) + return accumulate; + + return perform(accumulate, channel.filePath(), Throwables.FileOpType.READ, + of(buffers) + .map((buffer) -> + () -> + { + if (buffer != null) + FileUtils.clean(buffer); + })); + } + } + + public static final class Tidier implements RefCounted.Tidy + { + final State state; + + Tidier(State state) + { + this.state = state; + } + + public String name() + { + return state.channel.filePath(); + } + + public void tidy() + { + try + { + Throwables.maybeFail(state.close(null)); + } + catch (Exception e) + { + throw new FSReadError(e, state.channel.filePath()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java index 879ca6f..5f56ff6 100644 --- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java +++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java @@ -18,41 +18,31 @@ package org.apache.cassandra.io.util; import java.io.*; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.utils.JVMStabilityInspector; public class MmappedSegmentedFile extends SegmentedFile { private static final Logger logger = LoggerFactory.getLogger(MmappedSegmentedFile.class); - // in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane. - public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE; + private final MmappedRegions regions; - /** - * Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the - * segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back - * to a RandomAccessFile. - */ - private final Segment[] segments; - - public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long length, Segment[] segments) + public MmappedSegmentedFile(ChannelProxy channel, int bufferSize, long length, MmappedRegions regions) { - super(new Cleanup(channel, segments), channel, bufferSize, length); - this.segments = segments; + super(new Cleanup(channel, regions), channel, bufferSize, length); + this.regions = regions; } private MmappedSegmentedFile(MmappedSegmentedFile copy) { super(copy); - this.segments = copy.segments; + this.regions = copy.regions; } public MmappedSegmentedFile sharedCopy() @@ -60,78 +50,46 @@ public class MmappedSegmentedFile extends SegmentedFile return new MmappedSegmentedFile(this); } - /** - * @return The segment entry for the given position. - */ - private Segment floor(long position) + public RandomAccessReader createReader() { - assert 0 <= position && position < length: String.format("%d >= %d in %s", position, length, path()); - Segment seg = new Segment(position, null); - int idx = Arrays.binarySearch(segments, seg); - assert idx != -1 : String.format("Bad position %d for segments %s in %s", position, Arrays.toString(segments), path()); - if (idx < 0) - // round down to entry at insertion point - idx = -(idx + 2); - return segments[idx]; + return new RandomAccessReader.Builder(channel) + .overrideLength(length) + .regions(regions) + .build(); } - /** - * @return The segment containing the given position: must be closed after use. - */ - public FileDataInput getSegment(long position) + public RandomAccessReader createReader(RateLimiter limiter) { - Segment segment = floor(position); - if (segment.right != null) - { - // segment is mmap'd - return new ByteBufferDataInput(segment.right, path(), segment.left, (int) (position - segment.left)); - } - - // we can have single cells or partitions larger than 2Gb, which is our maximum addressable range in a single segment; - // in this case we open as a normal random access reader - // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row - RandomAccessReader file = RandomAccessReader.open(channel, bufferSize, -1L); - file.seek(position); - return file; + return new RandomAccessReader.Builder(channel) + .overrideLength(length) + .bufferSize(bufferSize) + .regions(regions) + .limiter(limiter) + .build(); } private static final class Cleanup extends SegmentedFile.Cleanup { - final Segment[] segments; - protected Cleanup(ChannelProxy channel, Segment[] segments) + private final MmappedRegions regions; + + Cleanup(ChannelProxy channel, MmappedRegions regions) { super(channel); - this.segments = segments; + this.regions = regions; } public void tidy() { - super.tidy(); - - if (!FileUtils.isCleanerAvailable()) - return; - - /* - * Try forcing the unmapping of segments using undocumented unsafe sun APIs. - * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping. - * If this works and a thread tries to access any segment, hell will unleash on earth. - */ - try - { - for (Segment segment : segments) - { - if (segment.right == null) - continue; - FileUtils.clean(segment.right); - } - logger.debug("All segments have been unmapped successfully"); - } - catch (Exception e) + Throwable err = regions.close(null); + if (err != null) { - JVMStabilityInspector.inspectThrowable(e); + JVMStabilityInspector.inspectThrowable(err); + // This is not supposed to happen - logger.error("Error while unmapping segments", e); + logger.error("Error while closing mmapped regions", err); } + + super.tidy(); } } @@ -140,104 +98,64 @@ public class MmappedSegmentedFile extends SegmentedFile */ static class Builder extends SegmentedFile.Builder { - // planned segment boundaries - private List boundaries; - - // offset of the open segment (first segment begins at 0). - private long currentStart = 0; - - // current length of the open segment. - // used to allow merging multiple too-large-to-mmap segments, into a single buffered segment. - private long currentSize = 0; + private MmappedRegions regions; - public Builder() + Builder() { super(); - boundaries = new ArrayList<>(); - boundaries.add(0L); - } - - public void addPotentialBoundary(long boundary) - { - if (boundary - currentStart <= MAX_SEGMENT_SIZE) - { - // boundary fits into current segment: expand it - currentSize = boundary - currentStart; - return; - } - - // close the current segment to try and make room for the boundary - if (currentSize > 0) - { - currentStart += currentSize; - boundaries.add(currentStart); - } - currentSize = boundary - currentStart; - - // if we couldn't make room, the boundary needs its own segment - if (currentSize > MAX_SEGMENT_SIZE) - { - currentStart = boundary; - boundaries.add(currentStart); - currentSize = 0; - } } public SegmentedFile complete(ChannelProxy channel, int bufferSize, long overrideLength) { long length = overrideLength > 0 ? overrideLength : channel.size(); - // create the segments - return new MmappedSegmentedFile(channel, bufferSize, length, createSegments(channel, length)); + updateRegions(channel, length); + + return new MmappedSegmentedFile(channel, bufferSize, length, regions.sharedCopy()); } - private Segment[] createSegments(ChannelProxy channel, long length) + private void updateRegions(ChannelProxy channel, long length) { - // if we're early finishing a range that doesn't span multiple segments, but the finished file now does, - // we remove these from the end (we loop incase somehow this spans multiple segments, but that would - // be a loco dataset - while (length < boundaries.get(boundaries.size() - 1)) - boundaries.remove(boundaries.size() -1); - - // add a sentinel value == length - List boundaries = new ArrayList<>(this.boundaries); - if (length != boundaries.get(boundaries.size() - 1)) - boundaries.add(length); - - int segcount = boundaries.size() - 1; - Segment[] segments = new Segment[segcount]; - for (int i = 0; i < segcount; i++) + if (regions != null && !regions.isValid(channel)) { - long start = boundaries.get(i); - long size = boundaries.get(i + 1) - start; - MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE - ? channel.map(FileChannel.MapMode.READ_ONLY, start, size) - : null; - segments[i] = new Segment(start, segment); + Throwable err = regions.close(null); + if (err != null) + logger.error("Failed to close mapped regions", err); + + regions = null; } - return segments; + + if (regions == null) + regions = MmappedRegions.map(channel, length); + else + regions.extend(length); } @Override - public void serializeBounds(DataOutput out) throws IOException + public void serializeBounds(DataOutput out, Version version) throws IOException { - super.serializeBounds(out); - out.writeInt(boundaries.size()); - for (long position: boundaries) - out.writeLong(position); + if (!version.hasBoundaries()) + return; + + super.serializeBounds(out, version); + out.writeInt(0); } @Override - public void deserializeBounds(DataInput in) throws IOException + public void deserializeBounds(DataInput in, Version version) throws IOException { - super.deserializeBounds(in); + if (!version.hasBoundaries()) + return; - int size = in.readInt(); - List temp = new ArrayList<>(size); - - for (int i = 0; i < size; i++) - temp.add(in.readLong()); + super.deserializeBounds(in, version); + in.skipBytes(in.readInt() * TypeSizes.sizeof(0L)); + } - boundaries = temp; + @Override + public Throwable close(Throwable accumulate) + { + return super.close(regions == null + ? accumulate + : regions.close(accumulate)); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce63ccc8/src/java/org/apache/cassandra/io/util/NIODataInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java index f6c939a..e599a69 100644 --- a/src/java/org/apache/cassandra/io/util/NIODataInputStream.java +++ b/src/java/org/apache/cassandra/io/util/NIODataInputStream.java @@ -18,17 +18,11 @@ package org.apache.cassandra.io.util; import java.io.Closeable; -import java.io.DataInputStream; -import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; -import org.apache.cassandra.utils.vint.VIntCoding; - import com.google.common.base.Preconditions; /** @@ -43,357 +37,59 @@ import com.google.common.base.Preconditions; * * NIODataInputStream is not thread safe. */ -public class NIODataInputStream extends InputStream implements DataInputPlus, Closeable +public class NIODataInputStream extends RebufferingInputStream { - private final ReadableByteChannel rbc; - private final ByteBuffer buf; - - /* - * Used when wrapping a fixed buffer of data instead of a channel. Should never attempt - * to read from it. - */ - private static final ReadableByteChannel emptyReadableByteChannel = new ReadableByteChannel() - { - - @Override - public boolean isOpen() - { - return true; - } - - @Override - public void close() throws IOException - { - } - - @Override - public int read(ByteBuffer dst) throws IOException - { - throw new AssertionError(); - } - - }; - - public NIODataInputStream(ReadableByteChannel rbc, int bufferSize) - { - Preconditions.checkNotNull(rbc); - Preconditions.checkArgument(bufferSize >= 9, "Buffer size must be large enough to accomadate a varint"); - this.rbc = rbc; - buf = ByteBuffer.allocateDirect(bufferSize); - buf.position(0); - buf.limit(0); - } - - protected NIODataInputStream(ByteBuffer buf, boolean duplicate) - { - Preconditions.checkNotNull(buf); - if (duplicate) - this.buf = buf.duplicate(); - else - this.buf = buf; - - this.rbc = emptyReadableByteChannel; - } - - /* - * The decision to duplicate or not really needs to conscious since it a real impact - * in terms of thread safety so don't expose this constructor with an implicit default. - */ - protected NIODataInputStream(ByteBuffer buf) - { - this(buf, false); - } - - @Override - public void readFully(byte[] b) throws IOException - { - readFully(b, 0, b.length); - } - - - @Override - public void readFully(byte[] b, int off, int len) throws IOException - { - int copied = 0; - while (copied < len) - { - int read = read(b, off + copied, len - copied); - if (read < 0) - throw new EOFException(); - copied += read; - } - } - - @Override - public int read(byte b[], int off, int len) throws IOException { - if (b == null) - throw new NullPointerException(); - - // avoid int overflow - if (off < 0 || off > b.length || len < 0 - || len > b.length - off) - throw new IndexOutOfBoundsException(); - - if (len == 0) - return 0; - - int copied = 0; - while (copied < len) - { - if (buf.hasRemaining()) - { - int toCopy = Math.min(len - copied, buf.remaining()); - buf.get(b, off + copied, toCopy); - copied += toCopy; - } - else - { - int read = readNext(); - if (read < 0 && copied == 0) return -1; - if (read <= 0) return copied; - } - } - - return copied; - } - - /* - * Refill the buffer, preserving any unread bytes remaining in the buffer - */ - protected int readNext() throws IOException - { - Preconditions.checkState(buf.remaining() != buf.capacity()); - assert(buf.remaining() < 9); - - /* - * If there is data already at the start of the buffer, move the position to the end - * If there is data but not at the start, move it to the start - * Otherwise move the position to 0 so writes start at the beginning of the buffer - * - * We go to the trouble of shuffling the bytes remaining for cases where the buffer isn't fully drained - * while retrieving a multi-byte value while the position is in the middle. - */ - if (buf.position() == 0 && buf.hasRemaining()) - { - buf.position(buf.limit()); - } - else if (buf.hasRemaining()) - { - //FastByteOperations.copy failed to do the copy so inline a simple one here - int position = buf.position(); - int remaining = buf.remaining(); - buf.clear(); - for (int ii = 0; ii < remaining; ii++) - buf.put(buf.get(position + ii)); - } - else - { - buf.position(0); - } - - buf.limit(buf.capacity()); - - int read = 0; - while ((read = rbc.read(buf)) == 0) {} - - buf.flip(); - - return read; - } - - /* - * Read the minimum number of bytes and throw EOF if the minimum could not be read - */ - private void readMinimum(int minimum) throws IOException - { - assert(buf.remaining() < 8); - while (buf.remaining() < minimum) - { - int read = readNext(); - if (read == -1) - { - //DataInputStream consumes the bytes even if it doesn't get the entire value, match the behavior here - buf.position(0); - buf.limit(0); - throw new EOFException(); - } - } - } - - /* - * Ensure the buffer contains the minimum number of readable bytes, throws EOF if enough bytes aren't available - */ - private void prepareReadPrimitive(int minimum) throws IOException - { - if (buf.remaining() < minimum) - readMinimum(minimum); - } - - @Override - public int skipBytes(int n) throws IOException - { - int skipped = 0; - - while (skipped < n) - { - int skippedThisTime = (int)skip(n - skipped); - if (skippedThisTime <= 0) break; - skipped += skippedThisTime; - } - - return skipped; - } - - @Override - public boolean readBoolean() throws IOException - { - prepareReadPrimitive(1); - return buf.get() != 0; - } - - @Override - public byte readByte() throws IOException - { - prepareReadPrimitive(1); - return buf.get(); - } - - @Override - public int readUnsignedByte() throws IOException - { - prepareReadPrimitive(1); - return buf.get() & 0xff; - } - - @Override - public short readShort() throws IOException - { - prepareReadPrimitive(2); - return buf.getShort(); - } - - @Override - public int readUnsignedShort() throws IOException - { - return readShort() & 0xFFFF; - } - - @Override - public char readChar() throws IOException - { - prepareReadPrimitive(2); - return buf.getChar(); - } - - @Override - public int readInt() throws IOException - { - prepareReadPrimitive(4); - return buf.getInt(); - } + protected final ReadableByteChannel channel; - @Override - public long readLong() throws IOException + private static ByteBuffer makeBuffer(int bufferSize) { - prepareReadPrimitive(8); - return buf.getLong(); - } + ByteBuffer buffer = ByteBuffer.allocateDirect(bufferSize); + buffer.position(0); + buffer.limit(0); - public long readVInt() throws IOException - { - return VIntCoding.decodeZigZag64(readUnsignedVInt()); + return buffer; } - public long readUnsignedVInt() throws IOException + public NIODataInputStream(ReadableByteChannel channel, ByteBuffer buffer) { - //If 9 bytes aren't available use the slow path in VIntCoding - if (buf.remaining() < 9) - return VIntCoding.readUnsignedVInt(this); + super(buffer); - byte firstByte = buf.get(); - - //Bail out early if this is one byte, necessary or it fails later - if (firstByte >= 0) - return firstByte; - - int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte); - - int position = buf.position(); - int extraBits = extraBytes * 8; - - long retval = buf.getLong(position); - if (buf.order() == ByteOrder.LITTLE_ENDIAN) - retval = Long.reverseBytes(retval); - buf.position(position + extraBytes); - - // truncate the bytes we read in excess of those we needed - retval >>>= 64 - extraBits; - // remove the non-value bits from the first byte - firstByte &= VIntCoding.firstByteValueMask(extraBytes); - // shift the first byte up to its correct position - retval |= (long) firstByte << extraBits; - return retval; + Preconditions.checkNotNull(channel); + this.channel = channel; } - @Override - public float readFloat() throws IOException + public NIODataInputStream(ReadableByteChannel channel, int bufferSize) { - prepareReadPrimitive(4); - return buf.getFloat(); + this(channel, makeBuffer(bufferSize)); } @Override - public double readDouble() throws IOException + protected void reBuffer() throws IOException { - prepareReadPrimitive(8); - return buf.getDouble(); - } + Preconditions.checkState(buffer.remaining() == 0); + buffer.clear(); - @Override - public String readLine() throws IOException - { - throw new UnsupportedOperationException(); - } + while ((channel.read(buffer)) == 0) {} - @Override - public String readUTF() throws IOException - { - return DataInputStream.readUTF(this); + buffer.flip(); } @Override public void close() throws IOException { - rbc.close(); - } - - @Override - public int read() throws IOException - { - return readUnsignedByte(); + channel.close(); + super.close(); } @Override public int available() throws IOException { - if (rbc instanceof SeekableByteChannel) + if (channel instanceof SeekableByteChannel) { - SeekableByteChannel sbc = (SeekableByteChannel)rbc; + SeekableByteChannel sbc = (SeekableByteChannel) channel; long remainder = Math.max(0, sbc.size() - sbc.position()); - return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buf.remaining()); + return (remainder > Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)(remainder + buffer.remaining()); } - return buf.remaining(); - } - - @Override - public void reset() throws IOException - { - throw new IOException("mark/reset not supported"); - } - - @Override - public boolean markSupported() - { - return false; + return buffer.remaining(); } }