From commits-return-1241-archive-asf-public=cust-asf.ponee.io@parquet.apache.org Wed Feb 21 18:40:18 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 63F8418061A for ; Wed, 21 Feb 2018 18:40:16 +0100 (CET) Received: (qmail 95053 invoked by uid 500); 21 Feb 2018 17:40:15 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 95044 invoked by uid 99); 21 Feb 2018 17:40:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Feb 2018 17:40:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0345BF1802; Wed, 21 Feb 2018 17:40:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Date: Wed, 21 Feb 2018 17:40:12 -0000 Message-Id: <49534560688d41e6977bcd75bbb7ac16@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] parquet-mr git commit: PARQUET-787: Limit read allocation size Repository: parquet-mr Updated Branches: refs/heads/master ad80bfe55 -> 8bbc6cb95 http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 6e593c2..1512a24 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -23,11 +23,11 @@ import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.util.Arrays; +import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory; */ abstract public class BytesInput { private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class); - private static final boolean DEBUG = false;//Log.DEBUG; private static final EmptyBytesInput EMPTY_BYTES_INPUT = new EmptyBytesInput(); /** @@ -75,14 +74,27 @@ abstract public class BytesInput { public static BytesInput from(InputStream in, int bytes) { return new StreamBytesInput(in, bytes); } - + /** - * @param buffer - * @param length number of bytes to read - * @return a BytesInput that will read the given bytes from the ByteBuffer + * @param buffers + * @return a BytesInput that will read the given bytes from the ByteBuffers */ - public static BytesInput from(ByteBuffer buffer, int offset, int length) { - return new ByteBufferBytesInput(buffer, offset, length); + public static BytesInput from(ByteBuffer... buffers) { + if (buffers.length == 1) { + return new ByteBufferBytesInput(buffers[0]); + } + return new BufferListBytesInput(Arrays.asList(buffers)); + } + + /** + * @param buffers + * @return a BytesInput that will read the given bytes from the ByteBuffers + */ + public static BytesInput from(List buffers) { + if (buffers.size() == 1) { + return new ByteBufferBytesInput(buffers.get(0)); + } + return new BufferListBytesInput(buffers); } /** @@ -208,8 +220,8 @@ abstract public class BytesInput { * @return a new InputStream materializing the contents of this input * @throws IOException */ - public InputStream toInputStream() throws IOException { - return new ByteBufferInputStream(toByteBuffer()); + public ByteBufferInputStream toInputStream() throws IOException { + return ByteBufferInputStream.wrap(toByteBuffer()); } /** @@ -439,7 +451,7 @@ abstract public class BytesInput { } public ByteBuffer toByteBuffer() throws IOException { - return ByteBuffer.wrap(in, offset, length); + return java.nio.ByteBuffer.wrap(in, offset, length); } @Override @@ -448,34 +460,31 @@ abstract public class BytesInput { } } - - private static class ByteBufferBytesInput extends BytesInput { - - private final ByteBuffer byteBuf; - private final int length; - private final int offset; - private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) { - this.byteBuf = byteBuf; - this.offset = offset; - this.length = length; + private static class BufferListBytesInput extends BytesInput { + private final List buffers; + private final long length; + + public BufferListBytesInput(List buffers) { + this.buffers = buffers; + long totalLen = 0; + for (ByteBuffer buffer : buffers) { + totalLen += buffer.remaining(); + } + this.length = totalLen; } @Override public void writeAllTo(OutputStream out) throws IOException { - final WritableByteChannel outputChannel = Channels.newChannel(out); - byteBuf.position(offset); - ByteBuffer tempBuf = byteBuf.slice(); - tempBuf.limit(length); - outputChannel.write(tempBuf); + WritableByteChannel channel = Channels.newChannel(out); + for (ByteBuffer buffer : buffers) { + channel.write(buffer.duplicate()); + } } - + @Override - public ByteBuffer toByteBuffer() throws IOException { - byteBuf.position(offset); - ByteBuffer buf = byteBuf.slice(); - buf.limit(length); - return buf; + public ByteBufferInputStream toInputStream() { + return ByteBufferInputStream.wrap(buffers); } @Override @@ -483,4 +492,27 @@ abstract public class BytesInput { return length; } } + + private static class ByteBufferBytesInput extends BytesInput { + private final ByteBuffer buffer; + + private ByteBufferBytesInput(ByteBuffer buffer) { + this.buffer = buffer; + } + + @Override + public void writeAllTo(OutputStream out) throws IOException { + Channels.newChannel(out).write(buffer.duplicate()); + } + + @Override + public ByteBufferInputStream toInputStream() { + return ByteBufferInputStream.wrap(buffer); + } + + @Override + public long size() { + return buffer.remaining(); + } + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java new file mode 100644 index 0000000..20a142b --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/MultiBufferInputStream.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +class MultiBufferInputStream extends ByteBufferInputStream { + private static final ByteBuffer EMPTY = ByteBuffer.allocate(0); + + private final List buffers; + private final long length; + + private Iterator iterator; + private ByteBuffer current = EMPTY; + private long position = 0; + + private long mark = -1; + private long markLimit = 0; + private List markBuffers = new ArrayList<>(); + + MultiBufferInputStream(List buffers) { + this.buffers = buffers; + + long totalLen = 0; + for (ByteBuffer buffer : buffers) { + totalLen += buffer.remaining(); + } + this.length = totalLen; + + this.iterator = buffers.iterator(); + + nextBuffer(); + } + + /** + * Returns the position in the stream. + */ + public long position() { + return position; + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + + if (current == null) { + return -1; + } + + long bytesSkipped = 0; + while (bytesSkipped < n) { + if (current.remaining() > 0) { + long bytesToSkip = Math.min(n - bytesSkipped, current.remaining()); + current.position(current.position() + (int) bytesToSkip); + bytesSkipped += bytesToSkip; + this.position += bytesToSkip; + } else if (!nextBuffer()) { + // there are no more buffers + return bytesSkipped > 0 ? bytesSkipped : -1; + } + } + + return bytesSkipped; + } + + @Override + public int read(ByteBuffer out) { + int len = out.remaining(); + if (len <= 0) { + return 0; + } + + if (current == null) { + return -1; + } + + int bytesCopied = 0; + while (bytesCopied < len) { + if (current.remaining() > 0) { + int bytesToCopy; + ByteBuffer copyBuffer; + if (current.remaining() <= out.remaining()) { + // copy all of the current buffer + bytesToCopy = current.remaining(); + copyBuffer = current; + } else { + // copy a slice of the current buffer + bytesToCopy = out.remaining(); + copyBuffer = current.duplicate(); + copyBuffer.limit(copyBuffer.position() + bytesToCopy); + current.position(copyBuffer.position() + bytesToCopy); + } + + out.put(copyBuffer); + bytesCopied += bytesToCopy; + this.position += bytesToCopy; + + } else if (!nextBuffer()) { + // there are no more buffers + return bytesCopied > 0 ? bytesCopied : -1; + } + } + + return bytesCopied; + } + + @Override + public ByteBuffer slice(int length) throws EOFException { + if (length <= 0) { + return EMPTY; + } + + if (current == null) { + throw new EOFException(); + } + + ByteBuffer slice; + if (length > current.remaining()) { + // a copy is needed to return a single buffer + // TODO: use an allocator + slice = ByteBuffer.allocate(length); + int bytesCopied = read(slice); + slice.flip(); + if (bytesCopied < length) { + throw new EOFException(); + } + } else { + slice = current.duplicate(); + slice.limit(slice.position() + length); + current.position(slice.position() + length); + this.position += length; + } + + return slice; + } + + public List sliceBuffers(long len) throws EOFException { + if (len <= 0) { + return Collections.emptyList(); + } + + if (current == null) { + throw new EOFException(); + } + + List buffers = new ArrayList<>(); + long bytesAccumulated = 0; + while (bytesAccumulated < len) { + if (current.remaining() > 0) { + // get a slice of the current buffer to return + // always fits in an int because remaining returns an int that is >= 0 + int bufLen = (int) Math.min(len - bytesAccumulated, current.remaining()); + ByteBuffer slice = current.duplicate(); + slice.limit(slice.position() + bufLen); + buffers.add(slice); + bytesAccumulated += bufLen; + + // update state; the bytes are considered read + current.position(current.position() + bufLen); + this.position += bufLen; + } else if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + + return buffers; + } + + @Override + public List remainingBuffers() { + if (position >= length) { + return Collections.emptyList(); + } + + try { + return sliceBuffers(length - position); + } catch (EOFException e) { + throw new RuntimeException( + "[Parquet bug] Stream is bad: incorrect bytes remaining " + + (length - position)); + } + } + + @Override + public int read(byte[] bytes, int off, int len) { + if (len <= 0) { + if (len < 0) { + throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len); + } + return 0; + } + + if (current == null) { + return -1; + } + + int bytesRead = 0; + while (bytesRead < len) { + if (current.remaining() > 0) { + int bytesToRead = Math.min(len - bytesRead, current.remaining()); + current.get(bytes, off + bytesRead, bytesToRead); + bytesRead += bytesToRead; + this.position += bytesToRead; + } else if (!nextBuffer()) { + // there are no more buffers + return bytesRead > 0 ? bytesRead : -1; + } + } + + return bytesRead; + } + + @Override + public int read(byte[] bytes) { + return read(bytes, 0, bytes.length); + } + + @Override + public int read() throws IOException { + if (current == null) { + throw new EOFException(); + } + + while (true) { + if (current.remaining() > 0) { + this.position += 1; + return current.get() & 0xFF; // as unsigned + } else if (!nextBuffer()) { + // there are no more buffers + throw new EOFException(); + } + } + } + + @Override + public int available() { + long remaining = length - position; + if (remaining > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) remaining; + } + } + + @Override + public void mark(int readlimit) { + if (mark >= 0) { + discardMark(); + } + this.mark = position; + this.markLimit = mark + readlimit + 1; + if (current != null) { + markBuffers.add(current.duplicate()); + } + } + + @Override + public void reset() throws IOException { + if (mark >= 0 && position < markLimit) { + this.position = mark; + // replace the current iterator with one that adds back the buffers that + // have been used since mark was called. + this.iterator = concat(markBuffers.iterator(), iterator); + discardMark(); + nextBuffer(); // go back to the marked buffers + } else { + throw new IOException("No mark defined or has read past the previous mark limit"); + } + } + + private void discardMark() { + this.mark = -1; + this.markLimit = 0; + markBuffers = new ArrayList<>(); + } + + @Override + public boolean markSupported() { + return true; + } + + private boolean nextBuffer() { + if (!iterator.hasNext()) { + this.current = null; + return false; + } + + this.current = iterator.next().duplicate(); + + if (mark >= 0) { + if (position < markLimit) { + // the mark is defined and valid. save the new buffer + markBuffers.add(current.duplicate()); + } else { + // the mark has not been used and is no longer valid + discardMark(); + } + } + + return true; + } + + private static Iterator concat(Iterator first, Iterator second) { + return new ConcatIterator<>(first, second); + } + + private static class ConcatIterator implements Iterator { + private final Iterator first; + private final Iterator second; + boolean useFirst = true; + + public ConcatIterator(Iterator first, Iterator second) { + this.first = first; + this.second = second; + } + + @Override + public boolean hasNext() { + if (useFirst) { + if (first.hasNext()) { + return true; + } else { + useFirst = false; + return second.hasNext(); + } + } + return second.hasNext(); + } + + @Override + public E next() { + if (useFirst && !first.hasNext()) { + useFirst = false; + } + + if (!useFirst && !second.hasNext()) { + throw new NoSuchElementException(); + } + + if (useFirst) { + return first.next(); + } + + return second.next(); + } + + @Override + public void remove() { + if (useFirst) { + first.remove(); + } + second.remove(); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java new file mode 100644 index 0000000..999d1bb --- /dev/null +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/SingleBufferInputStream.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.bytes; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; + +/** + * This ByteBufferInputStream does not consume the ByteBuffer being passed in, + * but will create a slice of the current buffer. + */ +class SingleBufferInputStream extends ByteBufferInputStream { + + private final ByteBuffer buffer; + private final long startPosition; + private int mark = -1; + + SingleBufferInputStream(ByteBuffer buffer) { + // duplicate the buffer because its state will be modified + this.buffer = buffer.duplicate(); + this.startPosition = buffer.position(); + } + + @Override + public long position() { + // position is relative to the start of the stream, not the buffer + return buffer.position() - startPosition; + } + + @Override + public int read() throws IOException { + if (!buffer.hasRemaining()) { + throw new EOFException(); + } + return buffer.get() & 0xFF; // as unsigned + } + + @Override + public int read(byte[] bytes, int offset, int length) throws IOException { + if (length == 0) { + return 0; + } + + int remaining = buffer.remaining(); + if (remaining <= 0) { + return -1; + } + + int bytesToRead = Math.min(buffer.remaining(), length); + buffer.get(bytes, offset, bytesToRead); + + return bytesToRead; + } + + @Override + public long skip(long n) { + if (n == 0) { + return 0; + } + + if (buffer.remaining() <= 0) { + return -1; + } + + // buffer.remaining is an int, so this will always fit in an int + int bytesToSkip = (int) Math.min(buffer.remaining(), n); + buffer.position(buffer.position() + bytesToSkip); + + return bytesToSkip; + } + + @Override + public int read(ByteBuffer out) { + int bytesToCopy; + ByteBuffer copyBuffer; + if (buffer.remaining() <= out.remaining()) { + // copy all of the buffer + bytesToCopy = buffer.remaining(); + copyBuffer = buffer; + } else { + // copy a slice of the current buffer + bytesToCopy = out.remaining(); + copyBuffer = buffer.duplicate(); + copyBuffer.limit(buffer.position() + bytesToCopy); + buffer.position(buffer.position() + bytesToCopy); + } + + out.put(copyBuffer); + out.flip(); + + return bytesToCopy; + } + + @Override + public ByteBuffer slice(int length) throws EOFException { + if (buffer.remaining() < length) { + throw new EOFException(); + } + + // length is less than remaining, so it must fit in an int + ByteBuffer copy = buffer.duplicate(); + copy.limit(copy.position() + length); + buffer.position(buffer.position() + length); + + return copy; + } + + @Override + public List sliceBuffers(long length) throws EOFException { + if (length == 0) { + return Collections.emptyList(); + } + + if (length > buffer.remaining()) { + throw new EOFException(); + } + + // length is less than remaining, so it must fit in an int + return Collections.singletonList(slice((int) length)); + } + + @Override + public List remainingBuffers() { + if (buffer.remaining() <= 0) { + return Collections.emptyList(); + } + + ByteBuffer remaining = buffer.duplicate(); + buffer.position(buffer.limit()); + + return Collections.singletonList(remaining); + } + + @Override + public void mark(int readlimit) { + this.mark = buffer.position(); + } + + @Override + public void reset() throws IOException { + if (mark >= 0) { + buffer.position(mark); + this.mark = -1; + } else { + throw new IOException("No mark defined"); + } + } + + @Override + public boolean markSupported() { + return true; + } + + @Override + public int available() { + return buffer.remaining(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java new file mode 100644 index 0000000..7bed2a9 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestByteBufferInputStreams.java @@ -0,0 +1,597 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import org.junit.Assert; +import org.junit.Test; +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; + +public abstract class TestByteBufferInputStreams { + + protected abstract ByteBufferInputStream newStream(); + protected abstract void checkOriginalData(); + + @Test + public void testRead0() throws Exception { + byte[] bytes = new byte[0]; + + ByteBufferInputStream stream = newStream(); + + Assert.assertEquals("Should read 0 bytes", 0, stream.read(bytes)); + + int bytesRead = stream.read(new byte[100]); + Assert.assertTrue("Should read to end of stream", bytesRead < 100); + + Assert.assertEquals("Should read 0 bytes at end of stream", + 0, stream.read(bytes)); + } + + @Test + public void testReadAll() throws Exception { + byte[] bytes = new byte[35]; + + ByteBufferInputStream stream = newStream(); + + int bytesRead = stream.read(bytes); + Assert.assertEquals("Should read the entire buffer", + bytes.length, bytesRead); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + Assert.assertEquals("Should advance position", 35, stream.position()); + } + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + checkOriginalData(); + } + + @Test + public void testSmallReads() throws Exception { + for (int size = 1; size < 36; size += 1) { + byte[] bytes = new byte[size]; + + ByteBufferInputStream stream = newStream(); + long length = stream.available(); + + int lastBytesRead = bytes.length; + for (int offset = 0; offset < length; offset += bytes.length) { + Assert.assertEquals("Should read requested len", + bytes.length, lastBytesRead); + + lastBytesRead = stream.read(bytes, 0, bytes.length); + + Assert.assertEquals("Should advance position", + offset + lastBytesRead, stream.position()); + + // validate the bytes that were read + for (int i = 0; i < lastBytesRead; i += 1) { + Assert.assertEquals("Byte i should be i", offset + i, bytes[i]); + } + } + + Assert.assertEquals("Should read fewer bytes at end of buffer", + length % bytes.length, lastBytesRead % bytes.length); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + } + + checkOriginalData(); + } + + @Test + public void testPartialBufferReads() throws Exception { + for (int size = 1; size < 35; size += 1) { + byte[] bytes = new byte[33]; + + ByteBufferInputStream stream = newStream(); + + int lastBytesRead = size; + for (int offset = 0; offset < bytes.length; offset += size) { + Assert.assertEquals("Should read requested len", size, lastBytesRead); + + lastBytesRead = stream.read( + bytes, offset, Math.min(size, bytes.length - offset)); + + Assert.assertEquals("Should advance position", + lastBytesRead > 0 ? offset + lastBytesRead : offset, + stream.position()); + } + + Assert.assertEquals("Should read fewer bytes at end of buffer", + bytes.length % size, lastBytesRead % size); + + for (int i = 0; i < bytes.length; i += 1) { + Assert.assertEquals("Byte i should be i", i, bytes[i]); + } + + Assert.assertEquals("Should have no more remaining content", + 2, stream.available()); + + Assert.assertEquals("Should return 2 more bytes", + 2, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + + Assert.assertEquals("Should return -1 at end of stream", + -1, stream.read(bytes)); + + Assert.assertEquals("Should have no more remaining content", + 0, stream.available()); + } + + checkOriginalData(); + } + + @Test + public void testReadByte() throws Exception { + final ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Position should increment", i, stream.position()); + Assert.assertEquals(i, stream.read()); + } + + assertThrows("Should throw EOFException at end of stream", + EOFException.class, new Callable() { + @Override + public Integer call() throws IOException { + return stream.read(); + } + }); + + checkOriginalData(); + } + + @Test + public void testSlice() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + ByteBuffer empty = stream.slice(0); + Assert.assertNotNull("slice(0) should produce a non-null buffer", empty); + Assert.assertEquals("slice(0) should produce an empty buffer", + 0, empty.remaining()); + + Assert.assertEquals("Position should be at start", 0, stream.position()); + + int i = 0; + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 10); + ByteBuffer buffer = stream.slice(bytesToSlice); + + for (int j = 0; j < bytesToSlice; j += 1) { + Assert.assertEquals("Data should be correct", i + j, buffer.get()); + } + + i += bytesToSlice; + } + + Assert.assertEquals("Position should be at end", length, stream.position()); + + checkOriginalData(); + } + + @Test + public void testSliceBuffers0() throws Exception { + ByteBufferInputStream stream = newStream(); + + Assert.assertEquals("Should return an empty list", + Collections.emptyList(), stream.sliceBuffers(0)); + } + + @Test + public void testWholeSliceBuffers() throws Exception { + final ByteBufferInputStream stream = newStream(); + final int length = stream.available(); + + List buffers = stream.sliceBuffers(stream.available()); + + Assert.assertEquals("Should consume all buffers", length, stream.position()); + + assertThrows("Should throw EOFException when empty", + EOFException.class, new Callable>() { + @Override + public List call() throws Exception { + return stream.sliceBuffers(length); + } + }); + + ByteBufferInputStream copy = ByteBufferInputStream.wrap(buffers); + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Slice should have identical data", i, copy.read()); + } + + checkOriginalData(); + } + + @Test + public void testSliceBuffersCoverage() throws Exception { + for (int size = 1; size < 36; size += 1) { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = new ArrayList<>(); + while (stream.available() > 0) { + buffers.addAll(stream.sliceBuffers(Math.min(size, stream.available()))); + } + + Assert.assertEquals("Should consume all content", + length, stream.position()); + + ByteBufferInputStream newStream = new MultiBufferInputStream(buffers); + + for (int i = 0; i < length; i += 1) { + Assert.assertEquals("Data should be correct", i, newStream.read()); + } + } + + checkOriginalData(); + } + + @Test + public void testSliceBuffersModification() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + int sliceLength = 5; + List buffers = stream.sliceBuffers(sliceLength); + Assert.assertEquals("Should advance the original stream", + length - sliceLength, stream.available()); + Assert.assertEquals("Should advance the original stream position", + sliceLength, stream.position()); + + Assert.assertEquals("Should return a slice of the first buffer", + 1, buffers.size()); + + ByteBuffer buffer = buffers.get(0); + Assert.assertEquals("Should have requested bytes", + sliceLength, buffer.remaining()); + + // read the buffer one past the returned limit. this should not change the + // next value in the original stream + buffer.limit(sliceLength + 1); + for (int i = 0; i < sliceLength + 1; i += 1) { + Assert.assertEquals("Should have correct data", i, buffer.get()); + } + + Assert.assertEquals("Reading a slice shouldn't advance the original stream", + sliceLength, stream.position()); + Assert.assertEquals("Reading a slice shouldn't change the underlying data", + sliceLength, stream.read()); + + // change the underlying data buffer + buffer.limit(sliceLength + 2); + int originalValue = buffer.duplicate().get(); + ByteBuffer undoBuffer = buffer.duplicate(); + + try { + buffer.put((byte) 255); + + Assert.assertEquals( + "Writing to a slice shouldn't advance the original stream", + sliceLength + 1, stream.position()); + Assert.assertEquals( + "Writing to a slice should change the underlying data", + 255, stream.read()); + + } finally { + undoBuffer.put((byte) originalValue); + } + } + + @Test + public void testSkip() throws Exception { + ByteBufferInputStream stream = newStream(); + + while (stream.available() > 0) { + int bytesToSkip = Math.min(stream.available(), 10); + Assert.assertEquals("Should skip all, regardless of backing buffers", + bytesToSkip, stream.skip(bytesToSkip)); + } + + stream = newStream(); + Assert.assertEquals(0, stream.skip(0)); + + int length = stream.available(); + Assert.assertEquals("Should stop at end when out of bytes", + length, stream.skip(length + 10)); + Assert.assertEquals("Should return -1 when at end", + -1, stream.skip(10)); + } + + @Test + public void testSkipFully() throws Exception { + ByteBufferInputStream stream = newStream(); + + long lastPosition = 0; + while (stream.available() > 0) { + int bytesToSkip = Math.min(stream.available(), 10); + + stream.skipFully(bytesToSkip); + + Assert.assertEquals("Should skip all, regardless of backing buffers", + bytesToSkip, stream.position() - lastPosition); + + lastPosition = stream.position(); + } + + final ByteBufferInputStream stream2 = newStream(); + stream2.skipFully(0); + Assert.assertEquals(0, stream2.position()); + + final int length = stream2.available(); + assertThrows("Should throw when out of bytes", + EOFException.class, new Callable() { + @Override + public Object call() throws Exception { + stream2.skipFully(length + 10); + return null; + } + }); + } + + @Test + public void testMark() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.read(new byte[7]); + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[100]; + int expectedBytesRead = stream.read(expected); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[100]; + int bytesReadAfterReset = stream.read(afterReset); + + Assert.assertEquals("Should read the same number of bytes", + expectedBytesRead, bytesReadAfterReset); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkTwice() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.read(new byte[7]); + stream.mark(1); + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[100]; + int expectedBytesRead = stream.read(expected); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[100]; + int bytesReadAfterReset = stream.read(afterReset); + + Assert.assertEquals("Should read the same number of bytes", + expectedBytesRead, bytesReadAfterReset); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkAtStart() throws Exception { + ByteBufferInputStream stream = newStream(); + + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[10]; + Assert.assertEquals("Should read 10 bytes", 10, stream.read(expected)); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[10]; + Assert.assertEquals("Should read 10 bytes", 10, stream.read(afterReset)); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkAtEnd() throws Exception { + ByteBufferInputStream stream = newStream(); + + int bytesRead = stream.read(new byte[100]); + Assert.assertTrue("Should read to end of stream", bytesRead < 100); + + stream.mark(100); + + long mark = stream.position(); + + byte[] expected = new byte[10]; + Assert.assertEquals("Should read 0 bytes", -1, stream.read(expected)); + + long end = stream.position(); + + stream.reset(); + + Assert.assertEquals("Position should return to the mark", + mark, stream.position()); + + byte[] afterReset = new byte[10]; + Assert.assertEquals("Should read 0 bytes", -1, stream.read(afterReset)); + + Assert.assertEquals("Read should end at the same position", + end, stream.position()); + + Assert.assertArrayEquals("Content should be equal", expected, afterReset); + } + + @Test + public void testMarkUnset() { + final ByteBufferInputStream stream = newStream(); + + assertThrows("Should throw an error for reset() without mark()", + IOException.class, new Callable() { + @Override + public Object call() throws Exception { + stream.reset(); + return null; + } + }); + } + + @Test + public void testMarkAndResetTwiceOverSameRange() throws Exception { + final ByteBufferInputStream stream = newStream(); + + byte[] expected = new byte[6]; + stream.mark(10); + Assert.assertEquals("Should read expected bytes", + expected.length, stream.read(expected)); + + stream.reset(); + stream.mark(10); + + byte[] firstRead = new byte[6]; + Assert.assertEquals("Should read firstRead bytes", + firstRead.length, stream.read(firstRead)); + + stream.reset(); + + byte[] secondRead = new byte[6]; + Assert.assertEquals("Should read secondRead bytes", + secondRead.length, stream.read(secondRead)); + + Assert.assertArrayEquals("First read should be correct", + expected, firstRead); + + Assert.assertArrayEquals("Second read should be correct", + expected, secondRead); + } + + @Test + public void testMarkLimit() throws Exception { + final ByteBufferInputStream stream = newStream(); + + stream.mark(5); + Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5])); + + stream.reset(); + + Assert.assertEquals("Should read 6 bytes", 6, stream.read(new byte[6])); + + assertThrows("Should throw an error for reset() after limit", + IOException.class, new Callable() { + @Override + public Object call() throws Exception { + stream.reset(); + return null; + } + }); + } + + @Test + public void testMarkDoubleReset() throws Exception { + final ByteBufferInputStream stream = newStream(); + + stream.mark(5); + Assert.assertEquals("Should read 5 bytes", 5, stream.read(new byte[5])); + + stream.reset(); + + assertThrows("Should throw an error for double reset()", + IOException.class, new Callable() { + @Override + public Object call() throws Exception { + stream.reset(); + return null; + } + }); + } + + /** + * A convenience method to avoid a large number of @Test(expected=...) tests + * @param message A String message to describe this assertion + * @param expected An Exception class that the Runnable should throw + * @param callable A Callable that is expected to throw the exception + */ + public static void assertThrows( + String message, Class expected, Callable callable) { + try { + callable.call(); + Assert.fail("No exception was thrown (" + message + "), expected: " + + expected.getName()); + } catch (Exception actual) { + try { + Assert.assertEquals(message, expected, actual.getClass()); + } catch (AssertionError e) { + e.addSuppressed(actual); + throw e; + } + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java new file mode 100644 index 0000000..253c986 --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestMultiBufferInputStream.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import org.junit.Assert; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestMultiBufferInputStream extends TestByteBufferInputStreams { + private static final List DATA = Arrays.asList( + ByteBuffer.wrap(new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 }), + ByteBuffer.wrap(new byte[] { 9, 10, 11, 12 }), + ByteBuffer.wrap(new byte[] { }), + ByteBuffer.wrap(new byte[] { 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24 }), + ByteBuffer.wrap(new byte[] { 25 }), + ByteBuffer.wrap(new byte[] { 26, 27, 28, 29, 30, 31, 32 }), + ByteBuffer.wrap(new byte[] { 33, 34 }) + ); + + @Override + protected ByteBufferInputStream newStream() { + return new MultiBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + for (ByteBuffer buffer : DATA) { + Assert.assertEquals("Position should not change", 0, buffer.position()); + Assert.assertEquals("Limit should not change", + buffer.array().length, buffer.limit()); + } + } + + @Test + public void testSliceData() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = new ArrayList<>(); + // slice the stream into 3 8-byte buffers and 1 2-byte buffer + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 8); + buffers.add(stream.slice(bytesToSlice)); + } + + Assert.assertEquals("Position should be at end", length, stream.position()); + Assert.assertEquals("Should produce 5 buffers", 5, buffers.size()); + + int i = 0; + + // one is a view of the first buffer because it is smaller + ByteBuffer one = buffers.get(0); + Assert.assertSame("Should be a duplicate of the first array", + one.array(), DATA.get(0).array()); + Assert.assertEquals(8, one.remaining()); + Assert.assertEquals(0, one.position()); + Assert.assertEquals(8, one.limit()); + Assert.assertEquals(9, one.capacity()); + for (; i < 8; i += 1) { + Assert.assertEquals("Should produce correct values", i, one.get()); + } + + // two should be a copy of the next 8 bytes + ByteBuffer two = buffers.get(1); + Assert.assertEquals(8, two.remaining()); + Assert.assertEquals(0, two.position()); + Assert.assertEquals(8, two.limit()); + Assert.assertEquals(8, two.capacity()); + for (; i < 16; i += 1) { + Assert.assertEquals("Should produce correct values", i, two.get()); + } + + // three is a copy of part of the 4th buffer + ByteBuffer three = buffers.get(2); + Assert.assertSame("Should be a duplicate of the fourth array", + three.array(), DATA.get(3).array()); + Assert.assertEquals(8, three.remaining()); + Assert.assertEquals(3, three.position()); + Assert.assertEquals(11, three.limit()); + Assert.assertEquals(12, three.capacity()); + for (; i < 24; i += 1) { + Assert.assertEquals("Should produce correct values", i, three.get()); + } + + // four should be a copy of the next 8 bytes + ByteBuffer four = buffers.get(3); + Assert.assertEquals(8, four.remaining()); + Assert.assertEquals(0, four.position()); + Assert.assertEquals(8, four.limit()); + Assert.assertEquals(8, four.capacity()); + for (; i < 32; i += 1) { + Assert.assertEquals("Should produce correct values", i, four.get()); + } + + // five should be a copy of the next 8 bytes + ByteBuffer five = buffers.get(4); + Assert.assertEquals(3, five.remaining()); + Assert.assertEquals(0, five.position()); + Assert.assertEquals(3, five.limit()); + Assert.assertEquals(3, five.capacity()); + for (; i < 35; i += 1) { + Assert.assertEquals("Should produce correct values", i, five.get()); + } + } + + @Test + public void testSliceBuffersData() throws Exception { + ByteBufferInputStream stream = newStream(); + + List buffers = stream.sliceBuffers(stream.available()); + List nonEmptyBuffers = new ArrayList<>(); + for (ByteBuffer buffer : DATA) { + if (buffer.remaining() > 0) { + nonEmptyBuffers.add(buffer); + } + } + + Assert.assertEquals("Should return duplicates of all non-empty buffers", + nonEmptyBuffers, buffers); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java ---------------------------------------------------------------------- diff --git a/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java new file mode 100644 index 0000000..9db23be --- /dev/null +++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestSingleBufferInputStream.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.bytes; + +import org.junit.Assert; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class TestSingleBufferInputStream extends TestByteBufferInputStreams { + private static final ByteBuffer DATA = ByteBuffer.wrap(new byte[] { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34 }); + + @Override + protected ByteBufferInputStream newStream() { + return new SingleBufferInputStream(DATA); + } + + @Override + protected void checkOriginalData() { + Assert.assertEquals("Position should not change", 0, DATA.position()); + Assert.assertEquals("Limit should not change", + DATA.array().length, DATA.limit()); + } + + @Test + public void testSliceData() throws Exception { + ByteBufferInputStream stream = newStream(); + int length = stream.available(); + + List buffers = new ArrayList<>(); + // slice the stream into 3 8-byte buffers and 1 2-byte buffer + while (stream.available() > 0) { + int bytesToSlice = Math.min(stream.available(), 8); + buffers.add(stream.slice(bytesToSlice)); + } + + Assert.assertEquals("Position should be at end", length, stream.position()); + Assert.assertEquals("Should produce 5 buffers", 5, buffers.size()); + + int i = 0; + + ByteBuffer one = buffers.get(0); + Assert.assertSame("Should use the same backing array", + one.array(), DATA.array()); + Assert.assertEquals(8, one.remaining()); + Assert.assertEquals(0, one.position()); + Assert.assertEquals(8, one.limit()); + Assert.assertEquals(35, one.capacity()); + for (; i < 8; i += 1) { + Assert.assertEquals("Should produce correct values", i, one.get()); + } + + ByteBuffer two = buffers.get(1); + Assert.assertSame("Should use the same backing array", + two.array(), DATA.array()); + Assert.assertEquals(8, two.remaining()); + Assert.assertEquals(8, two.position()); + Assert.assertEquals(16, two.limit()); + Assert.assertEquals(35, two.capacity()); + for (; i < 16; i += 1) { + Assert.assertEquals("Should produce correct values", i, two.get()); + } + + // three is a copy of part of the 4th buffer + ByteBuffer three = buffers.get(2); + Assert.assertSame("Should use the same backing array", + three.array(), DATA.array()); + Assert.assertEquals(8, three.remaining()); + Assert.assertEquals(16, three.position()); + Assert.assertEquals(24, three.limit()); + Assert.assertEquals(35, three.capacity()); + for (; i < 24; i += 1) { + Assert.assertEquals("Should produce correct values", i, three.get()); + } + + // four should be a copy of the next 8 bytes + ByteBuffer four = buffers.get(3); + Assert.assertSame("Should use the same backing array", + four.array(), DATA.array()); + Assert.assertEquals(8, four.remaining()); + Assert.assertEquals(24, four.position()); + Assert.assertEquals(32, four.limit()); + Assert.assertEquals(35, four.capacity()); + for (; i < 32; i += 1) { + Assert.assertEquals("Should produce correct values", i, four.get()); + } + + // five should be a copy of the next 8 bytes + ByteBuffer five = buffers.get(4); + Assert.assertSame("Should use the same backing array", + five.array(), DATA.array()); + Assert.assertEquals(3, five.remaining()); + Assert.assertEquals(32, five.position()); + Assert.assertEquals(35, five.limit()); + Assert.assertEquals(35, five.capacity()); + for (; i < 35; i += 1) { + Assert.assertEquals("Should produce correct values", i, five.get()); + } + } + + @Test + public void testWholeSliceBuffersData() throws Exception { + ByteBufferInputStream stream = newStream(); + + List buffers = stream.sliceBuffers(stream.available()); + Assert.assertEquals("Should return duplicates of all non-empty buffers", + Collections.singletonList(DATA), buffers); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index 87c8ac9..8d3e48d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -37,6 +37,8 @@ import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD public class HadoopReadOptions extends ParquetReadOptions { private final Configuration conf; + private static final String ALLOCATION_SIZE = "parquet.read.allocation.size"; + private HadoopReadOptions(boolean useSignedStringMinMax, boolean useStatsFilter, boolean useDictionaryFilter, @@ -45,11 +47,12 @@ public class HadoopReadOptions extends ParquetReadOptions { MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, ByteBufferAllocator allocator, + int maxAllocationSize, Map properties, Configuration conf) { super( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, recordFilter, - metadataFilter, codecFactory, allocator, properties + metadataFilter, codecFactory, allocator, maxAllocationSize, properties ); this.conf = conf; } @@ -82,6 +85,7 @@ public class HadoopReadOptions extends ParquetReadOptions { useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true)); withCodecFactory(HadoopCodecs.newFactory(conf, 0)); withRecordFilter(getFilter(conf)); + withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); if (badRecordThresh != null) { set(BAD_RECORD_THRESHOLD_CONF_KEY, badRecordThresh); @@ -92,7 +96,8 @@ public class HadoopReadOptions extends ParquetReadOptions { public ParquetReadOptions build() { return new HadoopReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - recordFilter, metadataFilter, codecFactory, allocator, properties, conf); + recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties, + conf); } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 5f2f0a8..4ef2460 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -38,6 +38,7 @@ public class ParquetReadOptions { private static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true; private static final boolean STATS_FILTERING_ENABLED_DEFAULT = true; private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true; + private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private final boolean useSignedStringMinMax; private final boolean useStatsFilter; @@ -47,17 +48,19 @@ public class ParquetReadOptions { private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; private final ByteBufferAllocator allocator; + private final int maxAllocationSize; private final Map properties; ParquetReadOptions(boolean useSignedStringMinMax, - boolean useStatsFilter, - boolean useDictionaryFilter, - boolean useRecordFilter, - FilterCompat.Filter recordFilter, - ParquetMetadataConverter.MetadataFilter metadataFilter, - CompressionCodecFactory codecFactory, - ByteBufferAllocator allocator, - Map properties) { + boolean useStatsFilter, + boolean useDictionaryFilter, + boolean useRecordFilter, + FilterCompat.Filter recordFilter, + ParquetMetadataConverter.MetadataFilter metadataFilter, + CompressionCodecFactory codecFactory, + ByteBufferAllocator allocator, + int maxAllocationSize, + Map properties) { this.useSignedStringMinMax = useSignedStringMinMax; this.useStatsFilter = useStatsFilter; this.useDictionaryFilter = useDictionaryFilter; @@ -66,6 +69,7 @@ public class ParquetReadOptions { this.metadataFilter = metadataFilter; this.codecFactory = codecFactory; this.allocator = allocator; + this.maxAllocationSize = maxAllocationSize; this.properties = Collections.unmodifiableMap(properties); } @@ -101,6 +105,10 @@ public class ParquetReadOptions { return allocator; } + public int getMaxAllocationSize() { + return maxAllocationSize; + } + public Set getPropertyNames() { return properties.keySet(); } @@ -122,16 +130,17 @@ public class ParquetReadOptions { } public static class Builder { - boolean useSignedStringMinMax = false; - boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; - boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; - boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; - FilterCompat.Filter recordFilter = null; - ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; + protected boolean useSignedStringMinMax = false; + protected boolean useStatsFilter = STATS_FILTERING_ENABLED_DEFAULT; + protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT; + protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT; + protected FilterCompat.Filter recordFilter = null; + protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors - CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); - ByteBufferAllocator allocator = new HeapByteBufferAllocator(); - Map properties = new HashMap<>(); + protected CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + protected ByteBufferAllocator allocator = new HeapByteBufferAllocator(); + protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT; + protected Map properties = new HashMap<>(); public Builder useSignedStringMinMax(boolean useSignedStringMinMax) { this.useSignedStringMinMax = useSignedStringMinMax; @@ -203,6 +212,11 @@ public class ParquetReadOptions { return this; } + public Builder withMaxAllocationInBytes(int allocationSizeInBytes) { + this.maxAllocationSize = allocationSizeInBytes; + return this; + } + public Builder set(String key, String value) { properties.put(key, value); return this; @@ -226,7 +240,7 @@ public class ParquetReadOptions { public ParquetReadOptions build() { return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - recordFilter, metadataFilter, codecFactory, allocator, properties); + recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties); } } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 8befa79..31d7bba 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -115,7 +115,7 @@ public class CodecFactory implements CompressionCodecFactory { @Override public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException { - ByteBuffer decompressed = decompress(BytesInput.from(input, 0, input.remaining()), uncompressedSize).toByteBuffer(); + ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); output.put(decompressed); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index 58e79ac..1377999 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -302,7 +302,9 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { size = Snappy.compress(this.incoming, outgoing); } - return BytesInput.from(outgoing, 0, (int) size); + outgoing.limit(size); + + return BytesInput.from(outgoing); } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 1ace040..6ef8a6c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -879,23 +879,23 @@ public class ParquetFileReader implements Closeable { * @author Julien Le Dem * */ - private class Chunk extends ByteBufferInputStream { + private class Chunk { - private final ChunkDescriptor descriptor; + protected final ChunkDescriptor descriptor; + protected final ByteBufferInputStream stream; /** * * @param descriptor descriptor for the chunk - * @param data contains the chunk data at offset - * @param offset where the chunk starts in offset + * @param buffers ByteBuffers that contain the chunk */ - public Chunk(ChunkDescriptor descriptor, ByteBuffer data, int offset) { - super(data, offset, descriptor.size); + public Chunk(ChunkDescriptor descriptor, List buffers) { this.descriptor = descriptor; + this.stream = ByteBufferInputStream.wrap(buffers); } protected PageHeader readPageHeader() throws IOException { - return Util.readPageHeader(this); + return Util.readPageHeader(stream); } /** @@ -967,7 +967,7 @@ public class ParquetFileReader implements Closeable { break; default: LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize); - this.skip(compressedPageSize); + stream.skipFully(compressedPageSize); break; } } @@ -977,29 +977,19 @@ public class ParquetFileReader implements Closeable { "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " + getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() - + " pages ending at file offset " + (descriptor.fileOffset + pos())); + + " pages ending at file offset " + (descriptor.fileOffset + stream.position())); } BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec()); return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage); } /** - * @return the current position in the chunk - */ - public int pos() { - return this.byteBuf.position(); - } - - /** * @param size the size of the page * @return the page * @throws IOException */ public BytesInput readAsBytesInput(int size) throws IOException { - int pos = this.byteBuf.position(); - final BytesInput r = BytesInput.from(this.byteBuf, pos, size); - this.byteBuf.position(pos + size); - return r; + return BytesInput.from(stream.sliceBuffers(size)); } } @@ -1016,44 +1006,51 @@ public class ParquetFileReader implements Closeable { /** * @param descriptor the descriptor of the chunk - * @param byteBuf contains the data of the chunk at offset - * @param offset where the chunk starts in data * @param f the file stream positioned at the end of this chunk */ - private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, SeekableInputStream f) { - super(descriptor, byteBuf, offset); + private WorkaroundChunk(ChunkDescriptor descriptor, List buffers, SeekableInputStream f) { + super(descriptor, buffers); this.f = f; } protected PageHeader readPageHeader() throws IOException { PageHeader pageHeader; - int initialPos = pos(); + stream.mark(8192); // headers should not be larger than 8k try { - pageHeader = Util.readPageHeader(this); + pageHeader = Util.readPageHeader(stream); } catch (IOException e) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error + stream.reset(); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); - pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream. + pageHeader = Util.readPageHeader(new SequenceInputStream(stream, f)); // trying again from the buffer + remainder of the stream. } return pageHeader; } public BytesInput readAsBytesInput(int size) throws IOException { - if (pos() + size > initPos + count) { + int available = stream.available(); + if (size > available) { // this is to workaround a bug where the compressedLength // of the chunk is missing the size of the header of the dictionary // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing - int l1 = initPos + count - pos(); - int l2 = size - l1; - LOG.info("completed the column chunk with {} bytes", l2); - return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2))); + int missingBytes = size - available; + LOG.info("completed the column chunk with {} bytes", missingBytes); + + List buffers = new ArrayList<>(); + buffers.addAll(stream.sliceBuffers(available)); + + ByteBuffer lastBuffer = ByteBuffer.allocate(missingBytes); + f.readFully(lastBuffer); + buffers.add(lastBuffer); + + return BytesInput.from(buffers); } + return super.readAsBytesInput(size); } @@ -1126,22 +1123,36 @@ public class ParquetFileReader implements Closeable { List result = new ArrayList(chunks.size()); f.seek(offset); - // Allocate the bytebuffer based on whether the FS can support it. - ByteBuffer chunksByteBuffer = options.getAllocator().allocate(length); - f.readFully(chunksByteBuffer); + int fullAllocations = length / options.getMaxAllocationSize(); + int lastAllocationSize = length % options.getMaxAllocationSize(); + + int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); + List buffers = new ArrayList<>(numAllocations); + + for (int i = 0; i < fullAllocations; i += 1) { + buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize())); + } + + if (lastAllocationSize > 0) { + buffers.add(options.getAllocator().allocate(lastAllocationSize)); + } + + for (ByteBuffer buffer : buffers) { + f.readFully(buffer); + buffer.flip(); + } // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); - int currentChunkOffset = 0; + ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffers); for (int i = 0; i < chunks.size(); i++) { ChunkDescriptor descriptor = chunks.get(i); if (i < chunks.size() - 1) { - result.add(new Chunk(descriptor, chunksByteBuffer, currentChunkOffset)); + result.add(new Chunk(descriptor, stream.sliceBuffers(descriptor.size))); } else { // because of a bug, the last chunk might be larger than descriptor.size - result.add(new WorkaroundChunk(descriptor, chunksByteBuffer, currentChunkOffset, f)); + result.add(new WorkaroundChunk(descriptor, stream.sliceBuffers(descriptor.size), f)); } - currentChunkOffset += descriptor.size; } return result ; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 3dd17e9..9d9a72f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -73,7 +73,7 @@ public class TestDirectCodecFactory { if (useOnHeapCompression) { compressed = c.compress(BytesInput.from(rawArr)); } else { - compressed = c.compress(BytesInput.from(rawBuf, 0, rawBuf.remaining())); + compressed = c.compress(BytesInput.from(rawBuf)); } switch (decomp) { @@ -95,11 +95,11 @@ public class TestDirectCodecFactory { case OFF_HEAP_BYTES_INPUT: { final ByteBuffer buf = compressed.toByteBuffer(); - final ByteBuffer b = allocator.allocate(buf.capacity()); + final ByteBuffer b = allocator.allocate(buf.limit()); try { b.put(buf); b.flip(); - final BytesInput input = d.decompress(BytesInput.from(b, 0, b.capacity()), size); + final BytesInput input = d.decompress(BytesInput.from(b), size); Assert.assertArrayEquals( String.format("While testing codec %s", codec), input.toByteArray(), rawArr);