parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [3/4] parquet-mr git commit: PARQUET-1142: Add alternatives to Hadoop classes in the API
Date Wed, 13 Dec 2017 19:28:01 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java b/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
new file mode 100644
index 0000000..078bc8f
--- /dev/null
+++ b/parquet-common/src/test/java/org/apache/parquet/io/TestDelegatingSeekableInputStream.java
@@ -0,0 +1,861 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import org.apache.parquet.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+
+import static org.apache.parquet.io.MockInputStream.TEST_ARRAY;
+
+
+public class TestDelegatingSeekableInputStream {
+
+  @Test
+  public void testReadFully() throws Exception {
+    byte[] buffer = new byte[5];
+
+    MockInputStream stream = new MockInputStream();
+    DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+    Assert.assertArrayEquals("Byte array contents should match",
+        Arrays.copyOfRange(TEST_ARRAY, 0, 5), buffer);
+    Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+  }
+
+  @Test
+  public void testReadFullySmallReads() throws Exception {
+    byte[] buffer = new byte[5];
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+    DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+    Assert.assertArrayEquals("Byte array contents should match",
+        Arrays.copyOfRange(TEST_ARRAY, 0, 5), buffer);
+    Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+  }
+
+  @Test
+  public void testReadFullyJustRight() throws Exception {
+    final byte[] buffer = new byte[10];
+
+    final MockInputStream stream = new MockInputStream(2, 3, 3);
+    DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+    Assert.assertArrayEquals("Byte array contents should match", TEST_ARRAY, buffer);
+    Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos());
+
+    TestUtils.assertThrows("Should throw EOFException if no more bytes left",
+        EOFException.class, new Callable<Void>() {
+          @Override
+          public Void call() throws IOException {
+            DelegatingSeekableInputStream.readFully(stream, buffer, 0, 1);
+            return null;
+          }
+        });
+  }
+
+  @Test
+  public void testReadFullyUnderflow() throws Exception {
+    final byte[] buffer = new byte[11];
+
+    final MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    TestUtils.assertThrows("Should throw EOFException if no more bytes left",
+        EOFException.class, new Callable<Void>() {
+          @Override
+          public Void call() throws IOException {
+            DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+            return null;
+          }
+        });
+
+    Assert.assertArrayEquals("Should have consumed bytes",
+        TEST_ARRAY, Arrays.copyOfRange(buffer, 0, 10));
+    Assert.assertEquals("Stream position should reflect bytes read", 10, stream.getPos());
+  }
+
+  @Test
+  public void testReadFullyStartAndLength() throws IOException {
+    byte[] buffer = new byte[10];
+
+    MockInputStream stream = new MockInputStream();
+    DelegatingSeekableInputStream.readFully(stream, buffer, 2, 5);
+
+    Assert.assertArrayEquals("Byte array contents should match",
+        Arrays.copyOfRange(TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7));
+    Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+  }
+
+  @Test
+  public void testReadFullyZeroByteRead() throws IOException {
+    byte[] buffer = new byte[0];
+
+    MockInputStream stream = new MockInputStream();
+    DelegatingSeekableInputStream.readFully(stream, buffer, 0, buffer.length);
+
+    Assert.assertEquals("Stream position should reflect bytes read", 0, stream.getPos());
+  }
+
+  @Test
+  public void testReadFullySmallReadsWithStartAndLength() throws IOException {
+    byte[] buffer = new byte[10];
+
+    MockInputStream stream = new MockInputStream(2, 2, 3);
+    DelegatingSeekableInputStream.readFully(stream, buffer, 2, 5);
+
+    Assert.assertArrayEquals("Byte array contents should match",
+        Arrays.copyOfRange(TEST_ARRAY, 0, 5), Arrays.copyOfRange(buffer, 2, 7));
+    Assert.assertEquals("Stream position should reflect bytes read", 5, stream.getPos());
+  }
+
+  private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
+    @Override
+    protected byte[] initialValue() {
+      return new byte[8192];
+    }
+  };
+
+  @Test
+  public void testHeapRead() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+    MockInputStream stream = new MockInputStream();
+
+    int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(-1, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapSmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(5);
+
+    MockInputStream stream = new MockInputStream();
+
+    int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(5, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(5, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+  }
+
+  @Test
+  public void testHeapSmallReads() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(2, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapPosition() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.position(10);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(8);
+
+    int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(8, len);
+    Assert.assertEquals(18, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(20, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(-1, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.limit(8);
+
+    MockInputStream stream = new MockInputStream(7);
+
+    int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testHeapPositionAndLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.position(5);
+    readBuffer.limit(13);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(7);
+
+    int len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(12, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(13, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(0, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectRead() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+    MockInputStream stream = new MockInputStream();
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(-1, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(5);
+
+    MockInputStream stream = new MockInputStream();
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(5, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(5, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallReads() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(2, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectPosition() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+    readBuffer.position(10);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(8);
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(8, len);
+    Assert.assertEquals(18, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(20, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(-1, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.limit(8);
+
+    MockInputStream stream = new MockInputStream(7);
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectPositionAndLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+    readBuffer.position(5);
+    readBuffer.limit(13);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(7);
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(12, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(13, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(0, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallTempBufferSmallReads() throws Exception {
+    byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(2, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(-1, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception {
+    byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+    readBuffer.position(5);
+    readBuffer.limit(13);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(7);
+
+    int len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(12, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(13, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = DelegatingSeekableInputStream.readDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(0, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullySmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(8);
+
+    MockInputStream stream = new MockInputStream();
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyLargeBuffer() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+    final MockInputStream stream = new MockInputStream();
+
+    TestUtils.assertThrows("Should throw EOFException",
+        EOFException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+            return null;
+          }
+        });
+
+    Assert.assertEquals(0, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+  }
+
+  @Test
+  public void testHeapReadFullyJustRight() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    MockInputStream stream = new MockInputStream();
+
+    // reads all of the bytes available without EOFException
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    // trying to read 0 more bytes doesn't result in EOFException
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullySmallReads() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyPosition() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.position(3);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.limit(7);
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyPositionAndLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    DelegatingSeekableInputStream.readFullyHeapBuffer(stream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
+
+    MockInputStream stream = new MockInputStream();
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyLargeBuffer() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+    final MockInputStream stream = new MockInputStream();
+
+    TestUtils.assertThrows("Should throw EOFException",
+        EOFException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+            return null;
+          }
+        });
+
+    // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+    // several read operations that will read up to the end of the input. This
+    // is a correct value because the bytes in the buffer are valid. This
+    // behavior can't be implemented for the heap buffer without using the read
+    // method instead of the readFully method on the underlying
+    // FSDataInputStream.
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+  }
+
+  @Test
+  public void testDirectReadFullyJustRight() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    MockInputStream stream = new MockInputStream();
+
+    // reads all of the bytes available without EOFException
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    // trying to read 0 more bytes doesn't result in EOFException
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallReads() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyPosition() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.limit(7);
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyPositionAndLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception {
+    byte[] temp = new byte[2]; // this will cause readFully to loop
+
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    MockInputStream stream = new MockInputStream(2, 3, 3);
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    DelegatingSeekableInputStream.readFullyDirectBuffer(stream, readBuffer, temp);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
deleted file mode 100644
index 6e593c2..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ /dev/null
@@ -1,486 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.nio.ByteBuffer;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * A source of bytes capable of writing itself to an output.
- * A BytesInput should be consumed right away.
- * It is not a container.
- * For example if it is referring to a stream,
- * subsequent BytesInput reads from the stream will be incorrect
- * if the previous has not been consumed.
- *
- * @author Julien Le Dem
- *
- */
-abstract public class BytesInput {
-  private static final Logger LOG = LoggerFactory.getLogger(BytesInput.class);
-  private static final boolean DEBUG = false;//Log.DEBUG;
-  private static final EmptyBytesInput EMPTY_BYTES_INPUT = new EmptyBytesInput();
-
-  /**
-   * logically concatenate the provided inputs
-   * @param inputs the inputs to concatenate
-   * @return a concatenated input
-   */
-  public static BytesInput concat(BytesInput... inputs) {
-    return new SequenceBytesIn(Arrays.asList(inputs));
-  }
-
-  /**
-   * logically concatenate the provided inputs
-   * @param inputs the inputs to concatenate
-   * @return a concatenated input
-   */
-  public static BytesInput concat(List<BytesInput> inputs) {
-    return new SequenceBytesIn(inputs);
-  }
-
-  /**
-   * @param in
-   * @param bytes number of bytes to read
-   * @return a BytesInput that will read that number of bytes from the stream
-   */
-  public static BytesInput from(InputStream in, int bytes) {
-    return new StreamBytesInput(in, bytes);
-  }
-  
-  /**
-   * @param buffer
-   * @param length number of bytes to read
-   * @return a BytesInput that will read the given bytes from the ByteBuffer
-   */
-  public static BytesInput from(ByteBuffer buffer, int offset, int length) {
-    return new ByteBufferBytesInput(buffer, offset, length);
-  }
-
-  /**
-   *
-   * @param in
-   * @return a Bytes input that will write the given bytes
-   */
-  public static BytesInput from(byte[] in) {
-    LOG.debug("BytesInput from array of {} bytes", in.length);
-    return new ByteArrayBytesInput(in, 0 , in.length);
-  }
-
-  public static BytesInput from(byte[] in, int offset, int length) {
-    LOG.debug("BytesInput from array of {} bytes", length);
-    return new ByteArrayBytesInput(in, offset, length);
-  }
-
-  /**
-   * @param intValue the int to write
-   * @return a BytesInput that will write 4 bytes in little endian
-   */
-  public static BytesInput fromInt(int intValue) {
-    return new IntBytesInput(intValue);
-  }
-
-  /**
-   * @param intValue the int to write
-   * @return a BytesInput that will write var int
-   */
-  public static BytesInput fromUnsignedVarInt(int intValue) {
-    return new UnsignedVarIntBytesInput(intValue);
-  }
-
-  /**
-   *
-   * @param intValue the int to write
-   */
-  public static BytesInput fromZigZagVarInt(int intValue) {
-    int zigZag = (intValue << 1) ^ (intValue >> 31);
-    return new UnsignedVarIntBytesInput(zigZag);
-  }
-
-  /**
-   * @param longValue the long to write
-   * @return a BytesInput that will write var long
-   */
-  public static BytesInput fromUnsignedVarLong(long longValue) {
-    return new UnsignedVarLongBytesInput(longValue);
-  }
-
-  /**
-   *
-   * @param longValue the long to write
-   */
-  public static BytesInput fromZigZagVarLong(long longValue) {
-    long zigZag = (longValue << 1) ^ (longValue >> 63);
-    return new UnsignedVarLongBytesInput(zigZag);
-  }
-
-  /**
-   * @param arrayOut
-   * @return a BytesInput that will write the content of the buffer
-   */
-  public static BytesInput from(CapacityByteArrayOutputStream arrayOut) {
-    return new CapacityBAOSBytesInput(arrayOut);
-  }
-
-  /**
-   * @param baos - stream to wrap into a BytesInput
-   * @return a BytesInput that will write the content of the buffer
-   */
-  public static BytesInput from(ByteArrayOutputStream baos) {
-    return new BAOSBytesInput(baos);
-  }
-
-  /**
-   * @return an empty bytes input
-   */
-  public static BytesInput empty() {
-    return EMPTY_BYTES_INPUT;
-  }
-
-  /**
-   * copies the input into a new byte array
-   * @param bytesInput
-   * @return
-   * @throws IOException
-   */
-  public static BytesInput copy(BytesInput bytesInput) throws IOException {
-    return from(bytesInput.toByteArray());
-  }
-
-  /**
-   * writes the bytes into a stream
-   * @param out
-   * @throws IOException
-   */
-  abstract public void writeAllTo(OutputStream out) throws IOException;
-
-  /**
-   *
-   * @return a new byte array materializing the contents of this input
-   * @throws IOException
-   */
-  public byte[] toByteArray() throws IOException {
-    BAOS baos = new BAOS((int)size());
-    this.writeAllTo(baos);
-    LOG.debug("converted {} to byteArray of {} bytes", size() , baos.size());
-    return baos.getBuf();
-  }
-
-  /**
-   *
-   * @return a new ByteBuffer materializing the contents of this input
-   * @throws IOException
-   */
-  public ByteBuffer toByteBuffer() throws IOException {
-    return ByteBuffer.wrap(toByteArray());
-  }
-
-  /**
-   *
-   * @return a new InputStream materializing the contents of this input
-   * @throws IOException
-   */
-  public InputStream toInputStream() throws IOException {
-    return new ByteBufferInputStream(toByteBuffer());
-  }
-
-  /**
-   *
-   * @return the size in bytes that would be written
-   */
-  abstract public long size();
-
-  private static final class BAOS extends ByteArrayOutputStream {
-    private BAOS(int size) {
-      super(size);
-    }
-
-    public byte[] getBuf() {
-      return this.buf;
-    }
-  }
-
-  private static class StreamBytesInput extends BytesInput {
-    private static final Logger LOG = LoggerFactory.getLogger(BytesInput.StreamBytesInput.class);
-    private final InputStream in;
-    private final int byteCount;
-
-    private StreamBytesInput(InputStream in, int byteCount) {
-      super();
-      this.in = in;
-      this.byteCount = byteCount;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      LOG.debug("write All {} bytes", byteCount);
-      // TODO: more efficient
-      out.write(this.toByteArray());
-    }
-
-    public byte[] toByteArray() throws IOException {
-      LOG.debug("read all {} bytes", byteCount);
-      byte[] buf = new byte[byteCount];
-      new DataInputStream(in).readFully(buf);
-      return buf;
-    }
-
-    @Override
-    public long size() {
-      return byteCount;
-    }
-
-  }
-
-  private static class SequenceBytesIn extends BytesInput {
-    private static final Logger LOG = LoggerFactory.getLogger(BytesInput.SequenceBytesIn.class);
-
-    private final List<BytesInput> inputs;
-    private final long size;
-
-    private SequenceBytesIn(List<BytesInput> inputs) {
-      this.inputs = inputs;
-      long total = 0;
-      for (BytesInput input : inputs) {
-        total += input.size();
-      }
-      this.size = total;
-    }
-
-    @SuppressWarnings("unused")
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      for (BytesInput input : inputs) {
-
-        LOG.debug("write {} bytes to out", input.size());
-        if (input instanceof SequenceBytesIn) LOG.debug("{");
-        input.writeAllTo(out);
-        if (input instanceof SequenceBytesIn) LOG.debug("}");
-      }
-    }
-
-    @Override
-    public long size() {
-      return size;
-    }
-
-  }
-
-  private static class IntBytesInput extends BytesInput {
-
-    private final int intValue;
-
-    public IntBytesInput(int intValue) {
-      this.intValue = intValue;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      BytesUtils.writeIntLittleEndian(out, intValue);
-    }
-
-    public ByteBuffer toByteBuffer() throws IOException {
-      return ByteBuffer.allocate(4).putInt(0, intValue);
-    }
-
-    @Override
-    public long size() {
-      return 4;
-    }
-
-  }
-
-  private static class UnsignedVarIntBytesInput extends BytesInput {
-
-    private final int intValue;
-
-    public UnsignedVarIntBytesInput(int intValue) {
-      this.intValue = intValue;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      BytesUtils.writeUnsignedVarInt(intValue, out);
-    }
-
-    public ByteBuffer toByteBuffer() throws IOException {
-      ByteBuffer ret = ByteBuffer.allocate((int) size());
-      BytesUtils.writeUnsignedVarInt(intValue, ret);
-      return ret;
-    }
-
-    @Override
-    public long size() {
-      int s = (38 - Integer.numberOfLeadingZeros(intValue)) / 7;
-      return s == 0 ? 1 : s;
-    }
-  }
-
-  private static class UnsignedVarLongBytesInput extends BytesInput {
-
-    private final long longValue;
-
-    public UnsignedVarLongBytesInput(long longValue) {
-      this.longValue = longValue;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      BytesUtils.writeUnsignedVarLong(longValue, out);
-    }
-
-    @Override
-    public long size() {
-      int s = (70 - Long.numberOfLeadingZeros(longValue)) / 7;
-      return s == 0 ? 1 : s;
-    }
-  }
-
-  private static class EmptyBytesInput extends BytesInput {
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-    }
-
-    @Override
-    public long size() {
-      return 0;
-    }
-
-    public ByteBuffer toByteBuffer() throws IOException {
-      return ByteBuffer.allocate(0);
-    }
-
-  }
-
-  private static class CapacityBAOSBytesInput extends BytesInput {
-
-    private final CapacityByteArrayOutputStream arrayOut;
-
-    private CapacityBAOSBytesInput(CapacityByteArrayOutputStream arrayOut) {
-      this.arrayOut = arrayOut;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      arrayOut.writeTo(out);
-    }
-
-    @Override
-    public long size() {
-      return arrayOut.size();
-    }
-
-  }
-
-  private static class BAOSBytesInput extends BytesInput {
-
-    private final ByteArrayOutputStream arrayOut;
-
-    private BAOSBytesInput(ByteArrayOutputStream arrayOut) {
-      this.arrayOut = arrayOut;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      arrayOut.writeTo(out);
-    }
-
-    @Override
-    public long size() {
-      return arrayOut.size();
-    }
-
-  }
-
-  private static class ByteArrayBytesInput extends BytesInput {
-
-    private final byte[] in;
-    private final int offset;
-    private final int length;
-
-    private ByteArrayBytesInput(byte[] in, int offset, int length) {
-      this.in = in;
-      this.offset = offset;
-      this.length = length;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      out.write(in, offset, length);
-    }
-
-    public ByteBuffer toByteBuffer() throws IOException {
-      return ByteBuffer.wrap(in, offset, length);
-    }
-
-    @Override
-    public long size() {
-      return length;
-    }
-
-  }
-  
-  private static class ByteBufferBytesInput extends BytesInput {
-    
-    private final ByteBuffer byteBuf;
-    private final int length;
-    private final int offset;
-
-    private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
-      this.byteBuf = byteBuf;
-      this.offset = offset;
-      this.length = length;
-    }
-
-    @Override
-    public void writeAllTo(OutputStream out) throws IOException {
-      final WritableByteChannel outputChannel = Channels.newChannel(out);
-      byteBuf.position(offset);
-      ByteBuffer tempBuf = byteBuf.slice();
-      tempBuf.limit(length);
-      outputChannel.write(tempBuf);
-    }
-    
-    @Override
-    public ByteBuffer toByteBuffer() throws IOException {
-      byteBuf.position(offset);
-      ByteBuffer buf = byteBuf.slice();
-      buf.limit(length);
-      return buf;
-    }
-
-    @Override
-    public long size() {
-      return length;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
deleted file mode 100644
index 92674d4..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import static java.lang.Math.max;
-import static java.lang.Math.pow;
-import static java.lang.String.format;
-import static org.apache.parquet.Preconditions.checkArgument;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.parquet.OutputStreamCloseException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying.
- * Where ByteArrayOutputStream is backed by a single array that "grows" by copying into a new larger array, this output
- * stream grows by allocating a new array (slab) and adding it to a list of previous arrays.
- *
- * Each new slab is allocated to be the same size as all the previous slabs combined, so these allocations become
- * exponentially less frequent, just like ByteArrayOutputStream, with one difference. This output stream accepts a
- * max capacity hint, which is a hint describing the max amount of data that will be written to this stream. As the
- * total size of this stream nears this max, this stream starts to grow linearly instead of exponentially.
- * So new slabs are allocated to be 1/5th of the max capacity hint,
- * instead of equal to the total previous size of all slabs. This is useful because it prevents allocating roughly
- * twice the needed space when a new slab is added just before the stream is done being used.
- *
- * When reusing this stream it will adjust the initial slab size based on the previous data size, aiming for fewer
- * allocations, with the assumption that a similar amount of data will be written to this stream on re-use.
- * See ({@link CapacityByteArrayOutputStream#reset()}).
- *
- * @author Julien Le Dem
- *
- */
-public class CapacityByteArrayOutputStream extends OutputStream {
-  private static final Logger LOG = LoggerFactory.getLogger(CapacityByteArrayOutputStream.class);
-  private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]);
-
-  private int initialSlabSize;
-  private final int maxCapacityHint;
-  private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
-
-  private ByteBuffer currentSlab;
-  private int currentSlabIndex;
-  private int bytesAllocated = 0;
-  private int bytesUsed = 0;
-  private ByteBufferAllocator allocator;
-
-  /**
-   * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it
-   * will end up allocating targetNumSlabs in order to reach targetCapacity. This aims to be
-   * a balance between the overhead of creating new slabs and wasting memory by eagerly making
-   * initial slabs too big.
-   *
-   * Note that targetCapacity here need not match maxCapacityHint in the constructor of
-   * CapacityByteArrayOutputStream, though often that would make sense.
-   *
-   * @param minSlabSize no matter what we shouldn't make slabs any smaller than this
-   * @param targetCapacity after we've allocated targetNumSlabs how much capacity should we have?
-   * @param targetNumSlabs how many slabs should it take to reach targetCapacity?
-   */
-  public static int initialSlabSizeHeuristic(int minSlabSize, int targetCapacity, int targetNumSlabs) {
-    // initialSlabSize = (targetCapacity / (2^targetNumSlabs)) means we double targetNumSlabs times
-    // before reaching the targetCapacity
-    // eg for page size of 1MB we start at 1024 bytes.
-    // we also don't want to start too small, so we also apply a minimum.
-    return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
-  }
-
-  public static CapacityByteArrayOutputStream withTargetNumSlabs(
-      int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
-    return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, new HeapByteBufferAllocator());
-  }
-
-  /**
-   * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is
-   * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
-   */
-  public static CapacityByteArrayOutputStream withTargetNumSlabs(
-      int minSlabSize, int maxCapacityHint, int targetNumSlabs, ByteBufferAllocator allocator) {
-
-    return new CapacityByteArrayOutputStream(
-        initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
-        maxCapacityHint, allocator);
-  }
-
-  /**
-   * Defaults maxCapacityHint to 1MB
-   * @param initialSlabSize
-   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
-   */
-  @Deprecated
-  public CapacityByteArrayOutputStream(int initialSlabSize) {
-    this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator());
-  }
-
-  /**
-   * Defaults maxCapacityHint to 1MB
-   * @param initialSlabSize
-   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
-   */
-  @Deprecated
-  public CapacityByteArrayOutputStream(int initialSlabSize, ByteBufferAllocator allocator) {
-    this(initialSlabSize, 1024 * 1024, allocator);
-  }
-
-  /**
-   * @param initialSlabSize the size to make the first slab
-   * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
-   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
-   */
-  @Deprecated
-  public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
-    this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator());
-  }
-
-  /**
-   * @param initialSlabSize the size to make the first slab
-   * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
-   */
-  public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint, ByteBufferAllocator allocator) {
-    checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
-    checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
-    checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint));
-    this.initialSlabSize = initialSlabSize;
-    this.maxCapacityHint = maxCapacityHint;
-    this.allocator = allocator;
-    reset();
-  }
-
-  /**
-   * the new slab is guaranteed to be at least minimumSize
-   * @param minimumSize the size of the data we want to copy in the new slab
-   */
-  private void addSlab(int minimumSize) {
-    int nextSlabSize;
-
-    if (bytesUsed == 0) {
-      nextSlabSize = initialSlabSize;
-    } else if (bytesUsed > maxCapacityHint / 5) {
-      // to avoid an overhead of up to twice the needed size, we get linear when approaching target page size
-      nextSlabSize = maxCapacityHint / 5;
-    } else {
-      // double the size every time
-      nextSlabSize = bytesUsed;
-    }
-
-    if (nextSlabSize < minimumSize) {
-      LOG.debug("slab size {} too small for value of size {}. Bumping up slab size", nextSlabSize, minimumSize);
-      nextSlabSize = minimumSize;
-    }
-
-    LOG.debug("used {} slabs, adding new slab of size {}", slabs.size(), nextSlabSize);
-
-    this.currentSlab = allocator.allocate(nextSlabSize);
-    this.slabs.add(currentSlab);
-    this.bytesAllocated += nextSlabSize;
-    this.currentSlabIndex = 0;
-  }
-
-  @Override
-  public void write(int b) {
-    if (!currentSlab.hasRemaining()) {
-      addSlab(1);
-    }
-    currentSlab.put(currentSlabIndex, (byte) b);
-    currentSlabIndex += 1;
-    currentSlab.position(currentSlabIndex);
-    bytesUsed += 1;
-  }
-
-  @Override
-  public void write(byte b[], int off, int len) {
-    if ((off < 0) || (off > b.length) || (len < 0) ||
-        ((off + len) - b.length > 0)) {
-      throw new IndexOutOfBoundsException(
-          String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
-    }
-    if (len >= currentSlab.remaining()) {
-      final int length1 = currentSlab.remaining();
-      currentSlab.put(b, off, length1);
-      bytesUsed += length1;
-      currentSlabIndex += length1;
-      final int length2 = len - length1;
-      addSlab(length2);
-      currentSlab.put(b, off + length1, length2);
-      currentSlabIndex = length2;
-      bytesUsed += length2;
-    } else {
-      currentSlab.put(b, off, len);
-      currentSlabIndex += len;
-      bytesUsed += len;
-    }
-  }
-
-  private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
-    if (buf.hasArray()) {
-      out.write(buf.array(), buf.arrayOffset(), len);
-    } else {
-      // The OutputStream interface only takes a byte[], unfortunately this means that a ByteBuffer
-      // not backed by a byte array must be copied to fulfil this interface
-      byte[] copy = new byte[len];
-      buf.flip();
-      buf.get(copy);
-      out.write(copy);
-    }
-  }
-
-  /**
-   * Writes the complete contents of this buffer to the specified output stream argument. the output
-   * stream's write method <code>out.write(slab, 0, slab.length)</code>) will be called once per slab.
-   *
-   * @param      out   the output stream to which to write the data.
-   * @exception  IOException  if an I/O error occurs.
-   */
-  public void writeTo(OutputStream out) throws IOException {
-    for (int i = 0; i < slabs.size() - 1; i++) {
-      writeToOutput(out, slabs.get(i), slabs.get(i).position());
-    }
-    writeToOutput(out, currentSlab, currentSlabIndex);
-  }
-
-  /**
-   * @return The total size in bytes of data written to this stream.
-   */
-  public long size() {
-    return bytesUsed;
-  }
-
-  /**
-   *
-   * @return The total size in bytes currently allocated for this stream.
-   */
-  public int getCapacity() {
-    return bytesAllocated;
-  }
-
-  /**
-   * When re-using an instance with reset, it will adjust slab size based on previous data size.
-   * The intent is to reuse the same instance for the same type of data (for example, the same column).
-   * The assumption is that the size in the buffer will be consistent.
-   */
-  public void reset() {
-    // readjust slab size.
-    // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size
-    this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
-    LOG.debug("initial slab of size {}", initialSlabSize);
-    for (ByteBuffer slab : slabs) {
-      allocator.release(slab);
-    }
-    this.slabs.clear();
-    this.bytesAllocated = 0;
-    this.bytesUsed = 0;
-    this.currentSlab = EMPTY_SLAB;
-    this.currentSlabIndex = 0;
-  }
-
-  /**
-   * @return the index of the last value written to this stream, which
-   * can be passed to {@link #setByte(long, byte)} in order to change it
-   */
-  public long getCurrentIndex() {
-    checkArgument(bytesUsed > 0, "This is an empty stream");
-    return bytesUsed - 1;
-  }
-
-  /**
-   * Replace the byte stored at position index in this stream with value
-   *
-   * @param index which byte to replace
-   * @param value the value to replace it with
-   */
-  public void setByte(long index, byte value) {
-    checkArgument(index < bytesUsed, "Index: " + index + " is >= the current size of: " + bytesUsed);
-
-    long seen = 0;
-    for (int i = 0; i < slabs.size(); i++) {
-      ByteBuffer slab = slabs.get(i);
-      if (index < seen + slab.limit()) {
-        // ok found index
-        slab.put((int)(index-seen), value);
-        break;
-      }
-      seen += slab.limit();
-    }
-  }
-
-  /**
-   * @param prefix  a prefix to be used for every new line in the string
-   * @return a text representation of the memory usage of this structure
-   */
-  public String memUsageString(String prefix) {
-    return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), getCapacity());
-  }
-
-  /**
-   * @return the total number of allocated slabs
-   */
-  int getSlabCount() {
-    return slabs.size();
-  }
-
-  @Override
-  public void close() {
-    for (ByteBuffer slab : slabs) {
-      allocator.release(slab);
-    }
-    try {
-      super.close();
-    }catch(IOException e){
-      throw new OutputStreamCloseException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
deleted file mode 100644
index d333168..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/ConcatenatingByteArrayCollector.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.lang.String.format;
-
-public class ConcatenatingByteArrayCollector extends BytesInput {
-  private final List<byte[]> slabs = new ArrayList<byte[]>();
-  private long size = 0;
-
-  public void collect(BytesInput bytesInput) throws IOException {
-    byte[] bytes = bytesInput.toByteArray();
-    slabs.add(bytes);
-    size += bytes.length;
-  }
-
-  public void reset() {
-    size = 0;
-    slabs.clear();
-  }
-
-  @Override
-  public void writeAllTo(OutputStream out) throws IOException {
-    for (byte[] slab : slabs) {
-      out.write(slab);
-    }
-  }
-
-  @Override
-  public long size() {
-    return size;
-  }
-
-  /**
-   * @param prefix  a prefix to be used for every new line in the string
-   * @return a text representation of the memory usage of this structure
-   */
-  public String memUsageString(String prefix) {
-    return format("%s %s %d slabs, %,d bytes", prefix, getClass().getSimpleName(), slabs.size(), size);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
deleted file mode 100644
index a092753..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataInputStream.java
+++ /dev/null
@@ -1,424 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Based on DataInputStream but little endian and without the String/char methods
- *
- * @author Julien Le Dem
- *
- */
-public final class LittleEndianDataInputStream extends InputStream {
-
-  private final InputStream in;
-
-  /**
-   * Creates a LittleEndianDataInputStream that uses the specified
-   * underlying InputStream.
-   *
-   * @param  in   the specified input stream
-   */
-  public LittleEndianDataInputStream(InputStream in) {
-    this.in = in;
-  }
-
-  /**
-   * See the general contract of the <code>readFully</code>
-   * method of <code>DataInput</code>.
-   * <p>
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @param      b   the buffer into which the data is read.
-   * @exception  EOFException  if this input stream reaches the end before
-   *             reading all the bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final void readFully(byte b[]) throws IOException {
-    readFully(b, 0, b.length);
-  }
-
-  /**
-   * See the general contract of the <code>readFully</code>
-   * method of <code>DataInput</code>.
-   * <p>
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @param      b     the buffer into which the data is read.
-   * @param      off   the start offset of the data.
-   * @param      len   the number of bytes to read.
-   * @exception  EOFException  if this input stream reaches the end before
-   *               reading all the bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final void readFully(byte b[], int off, int len) throws IOException {
-    if (len < 0)
-      throw new IndexOutOfBoundsException();
-    int n = 0;
-    while (n < len) {
-      int count = in.read(b, off + n, len - n);
-      if (count < 0)
-        throw new EOFException();
-      n += count;
-    }
-  }
-
-  /**
-   * See the general contract of the <code>skipBytes</code>
-   * method of <code>DataInput</code>.
-   * <p>
-   * Bytes for this operation are read from the contained
-   * input stream.
-   *
-   * @param      n   the number of bytes to be skipped.
-   * @return     the actual number of bytes skipped.
-   * @exception  IOException  if the contained input stream does not support
-   *             seek, or the stream has been closed and
-   *             the contained input stream does not support
-   *             reading after close, or another I/O error occurs.
-   */
-  public final int skipBytes(int n) throws IOException {
-    int total = 0;
-    int cur = 0;
-
-    while ((total<n) && ((cur = (int) in.skip(n-total)) > 0)) {
-      total += cur;
-    }
-
-    return total;
-  }
-
-  /**
-   * @return
-   * @throws IOException
-   * @see java.io.InputStream#read()
-   */
-  public int read() throws IOException {
-    return in.read();
-  }
-
-  /**
-   * @return
-   * @see java.lang.Object#hashCode()
-   */
-  public int hashCode() {
-    return in.hashCode();
-  }
-
-  /**
-   * @param b
-   * @return
-   * @throws IOException
-   * @see java.io.InputStream#read(byte[])
-   */
-  public int read(byte[] b) throws IOException {
-    return in.read(b);
-  }
-
-  /**
-   * @param obj
-   * @return
-   * @see java.lang.Object#equals(java.lang.Object)
-   */
-  public boolean equals(Object obj) {
-    return in.equals(obj);
-  }
-
-  /**
-   * @param b
-   * @param off
-   * @param len
-   * @return
-   * @throws IOException
-   * @see java.io.InputStream#read(byte[], int, int)
-   */
-  public int read(byte[] b, int off, int len) throws IOException {
-    return in.read(b, off, len);
-  }
-
-  /**
-   * @param n
-   * @return
-   * @throws IOException
-   * @see java.io.InputStream#skip(long)
-   */
-  public long skip(long n) throws IOException {
-    return in.skip(n);
-  }
-
-  /**
-   * @return
-   * @throws IOException
-   * @see java.io.InputStream#available()
-   */
-  public int available() throws IOException {
-    return in.available();
-  }
-
-  /**
-   * @throws IOException
-   * @see java.io.InputStream#close()
-   */
-  public void close() throws IOException {
-    in.close();
-  }
-
-  /**
-   * @param readlimit
-   * @see java.io.InputStream#mark(int)
-   */
-  public void mark(int readlimit) {
-    in.mark(readlimit);
-  }
-
-  /**
-   * @throws IOException
-   * @see java.io.InputStream#reset()
-   */
-  public void reset() throws IOException {
-    in.reset();
-  }
-
-  /**
-   * @return
-   * @see java.io.InputStream#markSupported()
-   */
-  public boolean markSupported() {
-    return in.markSupported();
-  }
-
-  /**
-   * See the general contract of the <code>readBoolean</code>
-   * method of <code>DataInput</code>.
-   * <p>
-   * Bytes for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the <code>boolean</code> value read.
-   * @exception  EOFException  if this input stream has reached the end.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final boolean readBoolean() throws IOException {
-    int ch = in.read();
-    if (ch < 0)
-      throw new EOFException();
-    return (ch != 0);
-  }
-
-  /**
-   * See the general contract of the <code>readByte</code>
-   * method of <code>DataInput</code>.
-   * <p>
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next byte of this input stream as a signed 8-bit
-   *             <code>byte</code>.
-   * @exception  EOFException  if this input stream has reached the end.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final byte readByte() throws IOException {
-    int ch = in.read();
-    if (ch < 0)
-      throw new EOFException();
-    return (byte)(ch);
-  }
-
-  /**
-   * See the general contract of the <code>readUnsignedByte</code>
-   * method of <code>DataInput</code>.
-   * <p>
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next byte of this input stream, interpreted as an
-   *             unsigned 8-bit number.
-   * @exception  EOFException  if this input stream has reached the end.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see         java.io.FilterInputStream#in
-   */
-  public final int readUnsignedByte() throws IOException {
-    int ch = in.read();
-    if (ch < 0)
-      throw new EOFException();
-    return ch;
-  }
-
-  /**
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next two bytes of this input stream, interpreted as a
-   *             signed 16-bit number.
-   * @exception  EOFException  if this input stream reaches the end before
-   *               reading two bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final short readShort() throws IOException {
-    int ch2 = in.read();
-    int ch1 = in.read();
-    if ((ch1 | ch2) < 0)
-      throw new EOFException();
-    return (short)((ch1 << 8) + (ch2 << 0));
-  }
-
-  /**
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next two bytes of this input stream, interpreted as an
-   *             unsigned 16-bit integer.
-   * @exception  EOFException  if this input stream reaches the end before
-   *             reading two bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final int readUnsignedShort() throws IOException {
-    int ch2 = in.read();
-    int ch1 = in.read();
-    if ((ch1 | ch2) < 0)
-      throw new EOFException();
-    return (ch1 << 8) + (ch2 << 0);
-  }
-
-  /**
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next four bytes of this input stream, interpreted as an
-   *             <code>int</code>.
-   * @exception  EOFException  if this input stream reaches the end before
-   *               reading four bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final int readInt() throws IOException {
-    // TODO: has this been benchmarked against two alternate implementations?
-    // 1) Integer.reverseBytes(in.readInt())
-    // 2) keep a member byte[4], wrapped by an IntBuffer with appropriate endianness set,
-    //    and call IntBuffer.get()
-    // Both seem like they might be faster.
-    int ch4 = in.read();
-    int ch3 = in.read();
-    int ch2 = in.read();
-    int ch1 = in.read();
-    if ((ch1 | ch2 | ch3 | ch4) < 0)
-      throw new EOFException();
-    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
-  }
-
-  private byte readBuffer[] = new byte[8];
-
-  /**
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next eight bytes of this input stream, interpreted as a
-   *             <code>long</code>.
-   * @exception  EOFException  if this input stream reaches the end before
-   *               reading eight bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.io.FilterInputStream#in
-   */
-  public final long readLong() throws IOException {
-    // TODO: see perf question above in readInt
-    readFully(readBuffer, 0, 8);
-    return (((long)readBuffer[7] << 56) +
-        ((long)(readBuffer[6] & 255) << 48) +
-        ((long)(readBuffer[5] & 255) << 40) +
-        ((long)(readBuffer[4] & 255) << 32) +
-        ((long)(readBuffer[3] & 255) << 24) +
-        ((readBuffer[2] & 255) << 16) +
-        ((readBuffer[1] & 255) <<  8) +
-        ((readBuffer[0] & 255) <<  0));
-  }
-
-  /**
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next four bytes of this input stream, interpreted as a
-   *             <code>float</code>.
-   * @exception  EOFException  if this input stream reaches the end before
-   *               reading four bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.lang.Float#intBitsToFloat(int)
-   */
-  public final float readFloat() throws IOException {
-    return Float.intBitsToFloat(readInt());
-  }
-
-  /**
-   * Bytes
-   * for this operation are read from the contained
-   * input stream.
-   *
-   * @return     the next eight bytes of this input stream, interpreted as a
-   *             <code>double</code>.
-   * @exception  EOFException  if this input stream reaches the end before
-   *               reading eight bytes.
-   * @exception  IOException   the stream has been closed and the contained
-   *             input stream does not support reading after close, or
-   *             another I/O error occurs.
-   * @see        java.lang.Double#longBitsToDouble(long)
-   */
-  public final double readDouble() throws IOException {
-    return Double.longBitsToDouble(readLong());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bfd9b4d/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
deleted file mode 100644
index 9d4a8a9..0000000
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.bytes;
-
-import org.apache.parquet.IOExceptionUtils;
-import org.apache.parquet.ParquetRuntimeException;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * Based on DataOutputStream but in little endian and without the String/char methods
- *
- * @author Julien Le Dem
- *
- */
-public class LittleEndianDataOutputStream extends OutputStream {
-
-  private final OutputStream out;
-
-  /**
-   * Creates a new data output stream to write data to the specified
-   * underlying output stream. The counter <code>written</code> is
-   * set to zero.
-   *
-   * @param   out   the underlying output stream, to be saved for later
-   *                use.
-   * @see     java.io.FilterOutputStream#out
-   */
-  public LittleEndianDataOutputStream(OutputStream out) {
-    this.out = out;
-  }
-
-  /**
-   * Writes the specified byte (the low eight bits of the argument
-   * <code>b</code>) to the underlying output stream. If no exception
-   * is thrown, the counter <code>written</code> is incremented by
-   * <code>1</code>.
-   * <p>
-   * Implements the <code>write</code> method of <code>OutputStream</code>.
-   *
-   * @param      b   the <code>byte</code> to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   */
-  public void write(int b) throws IOException {
-    out.write(b);
-  }
-
-  /**
-   * Writes <code>len</code> bytes from the specified byte array
-   * starting at offset <code>off</code> to the underlying output stream.
-   * If no exception is thrown, the counter <code>written</code> is
-   * incremented by <code>len</code>.
-   *
-   * @param      b     the data.
-   * @param      off   the start offset in the data.
-   * @param      len   the number of bytes to write.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   */
-  public void write(byte b[], int off, int len) throws IOException {
-    out.write(b, off, len);
-  }
-
-  /**
-   * Flushes this data output stream. This forces any buffered output
-   * bytes to be written out to the stream.
-   * <p>
-   * The <code>flush</code> method of <code>DataOutputStream</code>
-   * calls the <code>flush</code> method of its underlying output stream.
-   *
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   * @see        java.io.OutputStream#flush()
-   */
-  public void flush() throws IOException {
-    out.flush();
-  }
-
-  /**
-   * Writes a <code>boolean</code> to the underlying output stream as
-   * a 1-byte value. The value <code>true</code> is written out as the
-   * value <code>(byte)1</code>; the value <code>false</code> is
-   * written out as the value <code>(byte)0</code>. If no exception is
-   * thrown, the counter <code>written</code> is incremented by
-   * <code>1</code>.
-   *
-   * @param      v   a <code>boolean</code> value to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   */
-  public final void writeBoolean(boolean v) throws IOException {
-    out.write(v ? 1 : 0);
-  }
-
-  /**
-   * Writes out a <code>byte</code> to the underlying output stream as
-   * a 1-byte value. If no exception is thrown, the counter
-   * <code>written</code> is incremented by <code>1</code>.
-   *
-   * @param      v   a <code>byte</code> value to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   */
-  public final void writeByte(int v) throws IOException {
-    out.write(v);
-  }
-
-  /**
-   * Writes a <code>short</code> to the underlying output stream as two
-   * bytes, low byte first. If no exception is thrown, the counter
-   * <code>written</code> is incremented by <code>2</code>.
-   *
-   * @param      v   a <code>short</code> to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   */
-  public final void writeShort(int v) throws IOException {
-    out.write((v >>> 0) & 0xFF);
-    out.write((v >>> 8) & 0xFF);
-  }
-
-  /**
-   * Writes an <code>int</code> to the underlying output stream as four
-   * bytes, low byte first. If no exception is thrown, the counter
-   * <code>written</code> is incremented by <code>4</code>.
-   *
-   * @param      v   an <code>int</code> to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   */
-  public final void writeInt(int v) throws IOException {
-    // TODO: see note in LittleEndianDataInputStream: maybe faster
-    // to use Integer.reverseBytes() and then writeInt, or a ByteBuffer
-    // approach
-    out.write((v >>>  0) & 0xFF);
-    out.write((v >>>  8) & 0xFF);
-    out.write((v >>> 16) & 0xFF);
-    out.write((v >>> 24) & 0xFF);
-  }
-
-  private byte writeBuffer[] = new byte[8];
-
-  /**
-   * Writes a <code>long</code> to the underlying output stream as eight
-   * bytes, low byte first. In no exception is thrown, the counter
-   * <code>written</code> is incremented by <code>8</code>.
-   *
-   * @param      v   a <code>long</code> to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   */
-  public final void writeLong(long v) throws IOException {
-    writeBuffer[7] = (byte)(v >>> 56);
-    writeBuffer[6] = (byte)(v >>> 48);
-    writeBuffer[5] = (byte)(v >>> 40);
-    writeBuffer[4] = (byte)(v >>> 32);
-    writeBuffer[3] = (byte)(v >>> 24);
-    writeBuffer[2] = (byte)(v >>> 16);
-    writeBuffer[1] = (byte)(v >>>  8);
-    writeBuffer[0] = (byte)(v >>>  0);
-    out.write(writeBuffer, 0, 8);
-  }
-
-  /**
-   * Converts the float argument to an <code>int</code> using the
-   * <code>floatToIntBits</code> method in class <code>Float</code>,
-   * and then writes that <code>int</code> value to the underlying
-   * output stream as a 4-byte quantity, low byte first. If no
-   * exception is thrown, the counter <code>written</code> is
-   * incremented by <code>4</code>.
-   *
-   * @param      v   a <code>float</code> value to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   * @see        java.lang.Float#floatToIntBits(float)
-   */
-  public final void writeFloat(float v) throws IOException {
-    writeInt(Float.floatToIntBits(v));
-  }
-
-  /**
-   * Converts the double argument to a <code>long</code> using the
-   * <code>doubleToLongBits</code> method in class <code>Double</code>,
-   * and then writes that <code>long</code> value to the underlying
-   * output stream as an 8-byte quantity, low byte first. If no
-   * exception is thrown, the counter <code>written</code> is
-   * incremented by <code>8</code>.
-   *
-   * @param      v   a <code>double</code> value to be written.
-   * @exception  IOException  if an I/O error occurs.
-   * @see        java.io.FilterOutputStream#out
-   * @see        java.lang.Double#doubleToLongBits(double)
-   */
-  public final void writeDouble(double v) throws IOException {
-    writeLong(Double.doubleToLongBits(v));
-  }
-
-  public void close() {
-    IOExceptionUtils.closeQuietly(out);
-  }
-
-}


Mime
View raw message