parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-400: Replace CompatibilityUtil with SeekableInputStream.
Date Tue, 16 Aug 2016 17:12:16 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master c8d78b21b -> 898f3d0f6


PARQUET-400: Replace CompatibilityUtil with SeekableInputStream.

This fixes PARQUET-400 by replacing `CompatibilityUtil` with `SeekableInputStream` that's implemented for hadoop-1 and hadoop-2. The benefit of this approach is that `SeekableInputStream` can be used for non-Hadoop file systems in the future.

This also changes the default Hadoop version to Hadoop-2. The library is still compatible with Hadoop 1.x, but this makes building Hadoop-2 classes, like `H2SeekableInputStream`, much easier and removes the need for multiple hadoop versions during compilation.

Author: Ryan Blue <blue@apache.org>

Closes #349 from rdblue/PARQUET-400-byte-buffers and squashes the following commits:

1bcb8a8 [Ryan Blue] PARQUET-400: Fix review nits.
823ca00 [Ryan Blue] PARQUET-400: Add tests for Hadoop 2 readFully.
02d3709 [Ryan Blue] PARQUET-400: Remove unused property.
b543013 [Ryan Blue] PARQUET-400: Fix logger for HadoopStreams.
2cb6934 [Ryan Blue] PARQUET-400: Remove H2SeekableInputStream tests.
abaa695 [Ryan Blue] PARQUET-400: Fix review items.
5dc50a5 [Ryan Blue] PARQUET-400: Add tests for H1SeekableInputStream methods.
730a9e2 [Ryan Blue] PARQUET-400: Move SeekableInputStream to io package.
506a556 [Ryan Blue] PARQUET-400: Remove Hadoop dependencies from SeekableInputStream.
c80580c [Ryan Blue] PARQUET-400: Handle UnsupportedOperationException from read(ByteBuffer).
ba08b3f [Ryan Blue] PARQUET-400: Replace CompatibilityUtil with SeekableInputStream.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/898f3d0f
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/898f3d0f
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/898f3d0f

Branch: refs/heads/master
Commit: 898f3d0f652f313473c67fef32e22d94d8294d4f
Parents: c8d78b2
Author: Ryan Blue <blue@apache.org>
Authored: Tue Aug 16 10:12:00 2016 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Tue Aug 16 10:12:00 2016 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 .../apache/parquet/io/SeekableInputStream.java  | 106 +++
 .../parquet/hadoop/ParquetFileReader.java       |  31 +-
 .../parquet/hadoop/ParquetFileWriter.java       |  17 +-
 .../parquet/hadoop/util/CompatibilityUtil.java  | 114 ---
 .../hadoop/util/H1SeekableInputStream.java      | 154 ++++
 .../hadoop/util/H2SeekableInputStream.java      | 107 +++
 .../parquet/hadoop/util/HadoopStreams.java      | 100 +++
 .../parquet/hadoop/util/MockInputStream.java    |  87 +++
 .../hadoop/util/TestHadoop1ByteBufferReads.java | 761 +++++++++++++++++++
 .../hadoop/util/TestHadoop2ByteBufferReads.java | 405 ++++++++++
 pom.xml                                         |  14 +-
 12 files changed, 1761 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 890a372..ff9b356 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -24,8 +24,8 @@ before_install:
   - cd ..
 
 env:
-  - HADOOP_PROFILE=default TEST_CODECS=uncompressed
-  - HADOOP_PROFILE=hadoop-2 TEST_CODECS=gzip,snappy
+  - HADOOP_PROFILE=hadoop-1 TEST_CODECS=uncompressed
+  - HADOOP_PROFILE=default TEST_CODECS=gzip,snappy
 
 install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true > mvn_install.log || (cat mvn_install.log && false)
 script: mvn test -P $HADOOP_PROFILE

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java
new file mode 100644
index 0000000..7247817
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/io/SeekableInputStream.java
@@ -0,0 +1,106 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.io;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * {@code SeekableInputStream} is an interface with the methods needed by
+ * Parquet to read data from a file or Hadoop data stream.
+ */
+public abstract class SeekableInputStream extends InputStream {
+
+  /**
+   * Return the current position in the InputStream.
+   *
+   * @return current position in bytes from the start of the stream
+   * @throws IOException If the underlying stream throws IOException
+   */
+  public abstract long getPos() throws IOException;
+
+  /**
+   * Seek to a new position in the InputStream.
+   *
+   * @param newPos the new position to seek to
+   * @throws IOException If the underlying stream throws IOException
+   */
+  public abstract void seek(long newPos) throws IOException;
+
+  /**
+   * Read a byte array of data, from position 0 to the end of the array.
+   * <p>
+   * This method is equivalent to {@code read(bytes, 0, bytes.length)}.
+   * <p>
+   * This method will block until len bytes are available to copy into the
+   * array, or will throw {@link EOFException} if the stream ends before the
+   * array is full.
+   *
+   * @param bytes a byte array to fill with data from the stream
+   * @throws IOException If the underlying stream throws IOException
+   * @throws EOFException If the stream has fewer bytes left than are needed to
+   *                      fill the array, {@code bytes.length}
+   */
+  public abstract void readFully(byte[] bytes) throws IOException;
+
+  /**
+   * Read {@code len} bytes of data into an array, at position {@code start}.
+   * <p>
+   * This method will block until len bytes are available to copy into the
+   * array, or will throw {@link EOFException} if the stream ends before the
+   * array is full.
+   *
+   * @param bytes a byte array to fill with data from the stream
+   * @throws IOException If the underlying stream throws IOException
+   * @throws EOFException If the stream has fewer than {@code len} bytes left
+   */
+  public abstract void readFully(byte[] bytes, int start, int len) throws IOException;
+
+  /**
+   * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
+   * <p>
+   * This method will copy available bytes into the buffer, reading at most
+   * {@code buf.remaining()} bytes. The number of bytes actually copied is
+   * returned by the method, or -1 is returned to signal that the end of the
+   * underlying stream has been reached.
+   *
+   * @param buf a byte array to fill with data from the stream
+   * @return the number of bytes read or -1 if the stream ended
+   * @throws IOException If the underlying stream throws IOException
+   */
+  public abstract int read(ByteBuffer buf) throws IOException;
+
+  /**
+   * Read {@code buf.remaining()} bytes of data into a {@link ByteBuffer}.
+   * <p>
+   * This method will block until {@code buf.remaining()} bytes are available
+   * to copy into the buffer, or will throw {@link EOFException} if the stream
+   * ends before the buffer is full.
+   *
+   * @param buf a byte array to fill with data from the stream
+   * @throws IOException If the underlying stream throws IOException
+   * @throws EOFException If the stream has fewer bytes left than are needed to
+   *                      fill the buffer, {@code buf.remaining()}
+   */
+  public abstract void readFully(ByteBuffer buf) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 83542d5..59a7e46 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -54,7 +54,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -66,7 +65,6 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
-import org.apache.parquet.hadoop.util.CompatibilityUtil;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
@@ -91,6 +89,8 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ParquetDecodingException;
 
@@ -432,7 +432,7 @@ public class ParquetFileReader implements Closeable {
    */
   public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
     FileSystem fileSystem = file.getPath().getFileSystem(configuration);
-    FSDataInputStream in = fileSystem.open(file.getPath());
+    SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
     try {
       return readFooter(file.getLen(), file.getPath().toString(), in, filter);
     } finally {
@@ -449,7 +449,7 @@ public class ParquetFileReader implements Closeable {
    * @return the metadata blocks in the footer
    * @throws IOException if an error occurs while reading the file
    */
-  public static final ParquetMetadata readFooter(long fileLen, String filePath, FSDataInputStream f, MetadataFilter filter) throws IOException {
+  public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
     if (Log.DEBUG) {
       LOG.debug("File length " + fileLen);
     }
@@ -493,7 +493,7 @@ public class ParquetFileReader implements Closeable {
   }
 
   private final CodecFactory codecFactory;
-  private final FSDataInputStream f;
+  private final SeekableInputStream f;
   private final FileStatus fileStatus;
   private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
   private final FileMetaData fileMetaData; // may be null
@@ -531,7 +531,7 @@ public class ParquetFileReader implements Closeable {
     this.conf = configuration;
     this.fileMetaData = fileMetaData;
     FileSystem fs = filePath.getFileSystem(configuration);
-    this.f = fs.open(filePath);
+    this.f = HadoopStreams.wrap(fs.open(filePath));
     this.fileStatus = fs.getFileStatus(filePath);
     this.blocks = blocks;
     for (ColumnDescriptor col : columns) {
@@ -562,7 +562,7 @@ public class ParquetFileReader implements Closeable {
     this.conf = conf;
     FileSystem fs = file.getFileSystem(conf);
     this.fileStatus = fs.getFileStatus(file);
-    this.f = fs.open(file);
+    this.f = HadoopStreams.wrap(fs.open(file));
     this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
     this.fileMetaData = footer.getFileMetaData();
     this.blocks = footer.getBlocks();
@@ -585,7 +585,7 @@ public class ParquetFileReader implements Closeable {
     this.conf = conf;
     FileSystem fs = file.getFileSystem(conf);
     this.fileStatus = fs.getFileStatus(file);
-    this.f = fs.open(file);
+    this.f = HadoopStreams.wrap(fs.open(file));
     this.footer = footer;
     this.fileMetaData = footer.getFileMetaData();
     this.blocks = footer.getBlocks();
@@ -772,7 +772,7 @@ public class ParquetFileReader implements Closeable {
   }
 
   private static DictionaryPage readCompressedDictionary(
-      PageHeader pageHeader, FSDataInputStream fin) throws IOException {
+      PageHeader pageHeader, SeekableInputStream fin) throws IOException {
     DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
 
     int uncompressedPageSize = pageHeader.getUncompressed_page_size();
@@ -940,7 +940,7 @@ public class ParquetFileReader implements Closeable {
    */
   private class WorkaroundChunk extends Chunk {
 
-    private final FSDataInputStream f;
+    private final SeekableInputStream f;
 
     /**
      * @param descriptor the descriptor of the chunk
@@ -948,7 +948,7 @@ public class ParquetFileReader implements Closeable {
      * @param offset where the chunk starts in data
      * @param f the file stream positioned at the end of this chunk
      */
-    private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, FSDataInputStream f) {
+    private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, SeekableInputStream f) {
       super(descriptor, byteBuf, offset);
       this.f = f;
     }
@@ -964,7 +964,7 @@ public class ParquetFileReader implements Closeable {
         // to allow reading older files (using dictionary) we need this.
         // usually 13 to 19 bytes are missing
         // if the last page is smaller than this, the page header itself is truncated in the buffer.
-        this.byteBuf.rewind(); // resetting the buffer to the position before we got the error
+        this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error
         LOG.info("completing the column chunk to read the page header");
         pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
       }
@@ -1050,11 +1050,14 @@ public class ParquetFileReader implements Closeable {
      * @return the chunks
      * @throws IOException
      */
-    public List<Chunk> readAll(FSDataInputStream f) throws IOException {
+    public List<Chunk> readAll(SeekableInputStream f) throws IOException {
       List<Chunk> result = new ArrayList<Chunk>(chunks.size());
       f.seek(offset);
+
+      // Allocate the bytebuffer based on whether the FS can support it.
       ByteBuffer chunksByteBuffer = allocator.allocate(length);
-      CompatibilityUtil.getBuf(f, chunksByteBuffer, length);
+      f.readFully(chunksByteBuffer);
+
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
       int currentChunkOffset = 0;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 523d01f..f0fa7f5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -61,6 +61,8 @@ import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.GlobalMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopStreams;
+import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
@@ -495,6 +497,12 @@ public class ParquetFileWriter {
   public void appendRowGroups(FSDataInputStream file,
                               List<BlockMetaData> rowGroups,
                               boolean dropColumns) throws IOException {
+    appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns);
+  }
+
+  public void appendRowGroups(SeekableInputStream file,
+                              List<BlockMetaData> rowGroups,
+                              boolean dropColumns) throws IOException {
     for (BlockMetaData block : rowGroups) {
       appendRowGroup(file, block, dropColumns);
     }
@@ -502,6 +510,11 @@ public class ParquetFileWriter {
 
   public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
                              boolean dropColumns) throws IOException {
+    appendRowGroup(from, rowGroup, dropColumns);
+  }
+
+  public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
+    boolean dropColumns) throws IOException {
     startBlock(rowGroup.getRowCount());
 
     Map<String, ColumnChunkMetaData> columnsToCopy =
@@ -596,8 +609,8 @@ public class ParquetFileWriter {
    * @param length the number of bytes to copy
    * @throws IOException
    */
-  private static void copy(FSDataInputStream from, FSDataOutputStream to,
-                          long start, long length) throws IOException{
+  private static void copy(SeekableInputStream from, FSDataOutputStream to,
+                           long start, long length) throws IOException{
     if (DEBUG) LOG.debug(
         "Copying " + length + " bytes at " + start + " to " + to.getPos());
     from.seek(start);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
deleted file mode 100644
index bacf222..0000000
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.parquet.hadoop.util;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.parquet.ShouldNeverHappenException;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-
-public class CompatibilityUtil {
-
-  // Will be set to true if the implementation of FSDataInputSteam supports
-  // the 2.x APIs, in particular reading using a provided ByteBuffer
-  private static boolean useV21;
-  public static final V21FileAPI fileAPI;
-
-  private static class V21FileAPI {
-    private final Method PROVIDE_BUF_READ_METHOD;
-    private final Class<?> FSDataInputStreamCls;
-
-    private V21FileAPI() throws ReflectiveOperationException {
-      final String PACKAGE = "org.apache.hadoop";
-      FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream");
-      PROVIDE_BUF_READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBuffer.class);
-    }
-  }
-  
-  static {
-    // Test to see if a class from the Hadoop 2.x API is available
-    boolean v21 = true;
-    try {
-      Class.forName("org.apache.hadoop.io.compress.DirectDecompressor");
-    } catch (ClassNotFoundException cnfe) {
-      v21 = false;
-    }
-
-    useV21 = v21;
-    try {
-      if (v21) {
-        fileAPI = new V21FileAPI();
-      } else {
-        fileAPI = null;
-      }
-
-    } catch (ReflectiveOperationException e) {
-      throw new IllegalArgumentException("Error finding appropriate interfaces using reflection.", e);
-    }
-  }
-
-  private static Object invoke(Method method, String errorMsg, Object instance, Object... args) {
-    try {
-      return method.invoke(instance, args);
-    } catch (IllegalAccessException e) {
-      throw new IllegalArgumentException(errorMsg, e);
-    } catch (InvocationTargetException e) {
-      throw new IllegalArgumentException(errorMsg, e);
-    }
-  }
-
-  public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException {
-    int res;
-    if (useV21) {
-      try {
-        res = (Integer) fileAPI.PROVIDE_BUF_READ_METHOD.invoke(f, readBuf);
-      } catch (InvocationTargetException e) {
-        if (e.getCause() instanceof UnsupportedOperationException) {
-          // the FSDataInputStream docs say specifically that implementations
-          // can choose to throw UnsupportedOperationException, so this should
-          // be a reasonable check to make to see if the interface is
-          // present but not implemented and we should be falling back
-          useV21 = false;
-          return getBuf(f, readBuf, maxSize);
-        } else if (e.getCause() instanceof IOException) {
-          throw (IOException) e.getCause();
-        } else {
-          // To handle any cases where a Runtime exception occurs and provide
-          // some additional context information. A stacktrace would just give
-          // a line number, this at least tells them we were using the version
-          // of the read method designed for using a ByteBuffer.
-          throw new IOException("Error reading out of an FSDataInputStream " +
-              "using the Hadoop 2 ByteBuffer based read method.", e.getCause());
-        }
-      } catch (IllegalAccessException e) {
-        // This method is public because it is defined in an interface,
-        // there should be no problems accessing it
-        throw new ShouldNeverHappenException(e);
-      }
-    } else {
-      byte[] buf = new byte[maxSize];
-      res = f.read(buf);
-      readBuf.put(buf, 0, res);
-    }
-    return res;
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
new file mode 100644
index 0000000..4a03b1a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H1SeekableInputStream.java
@@ -0,0 +1,154 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.io.SeekableInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * SeekableInputStream implementation that implements read(ByteBuffer) for
+ * Hadoop 1 FSDataInputStream.
+ */
+class H1SeekableInputStream extends SeekableInputStream {
+
+  private final int COPY_BUFFER_SIZE = 8192;
+  private final byte[] temp = new byte[COPY_BUFFER_SIZE];
+
+  private final FSDataInputStream stream;
+
+  public H1SeekableInputStream(FSDataInputStream stream) {
+    this.stream = stream;
+  }
+
+  @Override
+  public void close() throws IOException {
+    stream.close();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return stream.getPos();
+  }
+
+  @Override
+  public void seek(long newPos) throws IOException {
+    stream.seek(newPos);
+  }
+
+  @Override
+  public int read() throws IOException {
+    return stream.read();
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return stream.read(b, off, len);
+  }
+
+  @Override
+  public void readFully(byte[] bytes) throws IOException {
+    stream.readFully(bytes, 0, bytes.length);
+  }
+
+  @Override
+  public void readFully(byte[] bytes, int start, int len) throws IOException {
+    stream.readFully(bytes);
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    if (buf.hasArray()) {
+      return readHeapBuffer(stream, buf);
+    } else {
+      return readDirectBuffer(stream, buf, temp);
+    }
+  }
+
+  @Override
+  public void readFully(ByteBuffer buf) throws IOException {
+    if (buf.hasArray()) {
+      readFullyHeapBuffer(stream, buf);
+    } else {
+      readFullyDirectBuffer(stream, buf, temp);
+    }
+  }
+
+  // Visible for testing
+  static int readHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
+    int bytesRead = f.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+    if (bytesRead < 0) {
+      // if this resulted in EOF, don't update position
+      return bytesRead;
+    } else {
+      buf.position(buf.position() + bytesRead);
+      return bytesRead;
+    }
+  }
+
+  // Visible for testing
+  static void readFullyHeapBuffer(FSDataInputStream f, ByteBuffer buf) throws IOException {
+    f.readFully(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining());
+    buf.position(buf.limit());
+  }
+
+  // Visible for testing
+  static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
+    // copy all the bytes that return immediately, stopping at the first
+    // read that doesn't return a full buffer.
+    int nextReadLength = Math.min(buf.remaining(), temp.length);
+    int totalBytesRead = 0;
+    int bytesRead;
+
+    while ((bytesRead = f.read(temp, 0, nextReadLength)) == temp.length) {
+      buf.put(temp);
+      totalBytesRead += bytesRead;
+      nextReadLength = Math.min(buf.remaining(), temp.length);
+    }
+
+    if (bytesRead < 0) {
+      // return -1 if nothing was read
+      return totalBytesRead == 0 ? -1 : totalBytesRead;
+    } else {
+      // copy the last partial buffer
+      buf.put(temp, 0, bytesRead);
+      totalBytesRead += bytesRead;
+      return totalBytesRead;
+    }
+  }
+
+  // Visible for testing
+  static void readFullyDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException {
+    int nextReadLength = Math.min(buf.remaining(), temp.length);
+    int bytesRead = 0;
+
+    while (nextReadLength > 0 && (bytesRead = f.read(temp, 0, nextReadLength)) >= 0) {
+      buf.put(temp, 0, bytesRead);
+      nextReadLength = Math.min(buf.remaining(), temp.length);
+    }
+
+    if (bytesRead < 0 && buf.remaining() > 0) {
+      throw new EOFException(
+          "Reached the end of stream. Still have: " + buf.remaining() + " bytes left");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
new file mode 100644
index 0000000..a706546
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H2SeekableInputStream.java
@@ -0,0 +1,107 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.io.SeekableInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * SeekableInputStream implementation for FSDataInputStream that implements
+ * ByteBufferReadable in Hadoop 2.
+ */
+class H2SeekableInputStream extends SeekableInputStream {
+
+  // Visible for testing
+  interface Reader {
+    int read(ByteBuffer buf) throws IOException;
+  }
+
+  private final FSDataInputStream stream;
+  private final Reader reader;
+
+  public H2SeekableInputStream(FSDataInputStream stream) {
+    this.stream = stream;
+    this.reader = new H2Reader();
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return stream.getPos();
+  }
+
+  @Override
+  public void seek(long newPos) throws IOException {
+    stream.seek(newPos);
+  }
+
+  @Override
+  public int read() throws IOException {
+    return stream.read();
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    return stream.read(b, off, len);
+  }
+
+  @Override
+  public void readFully(byte[] bytes) throws IOException {
+    stream.readFully(bytes, 0, bytes.length);
+  }
+
+  @Override
+  public void readFully(byte[] bytes, int start, int len) throws IOException {
+    stream.readFully(bytes);
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    return stream.read(buf);
+  }
+
+  @Override
+  public void readFully(ByteBuffer buf) throws IOException {
+    readFully(reader, buf);
+  }
+
+  private class H2Reader implements Reader {
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+      return stream.read(buf);
+    }
+  }
+
+  public static void readFully(Reader reader, ByteBuffer buf) throws IOException {
+    // unfortunately the Hadoop APIs seem to not have a 'readFully' equivalent for the byteBuffer read
+    // calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we
+    // have to loop to ensure we read them.
+    while (buf.hasRemaining()) {
+      int readCount = reader.read(buf);
+      if (readCount == -1) {
+        // this is probably a bug in the ParquetReader. We shouldn't have called readFully with a buffer
+        // that has more remaining than the amount of data in the stream.
+        throw new EOFException("Reached the end of stream. Still have: " + buf.remaining() + " bytes left");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
new file mode 100644
index 0000000..7c321cd
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopStreams.java
@@ -0,0 +1,100 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.Log;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.SeekableInputStream;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Convenience methods to get Parquet abstractions for Hadoop data streams.
+ */
+public class HadoopStreams {
+
+  private static final Log LOG = Log.getLog(HadoopStreams.class);
+
+  private static final Class<?> byteBufferReadableClass = getReadableClass();
+  static final Constructor<SeekableInputStream> h2SeekableConstructor = getH2SeekableConstructor();
+
+  /**
+   * Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
+   * implementation for Parquet readers.
+   *
+   * @param stream a Hadoop FSDataInputStream
+   * @return a SeekableInputStream
+   */
+  public static SeekableInputStream wrap(FSDataInputStream stream) {
+    if (byteBufferReadableClass != null && h2SeekableConstructor != null &&
+        byteBufferReadableClass.isInstance(stream.getWrappedStream())) {
+      try {
+        return h2SeekableConstructor.newInstance(stream);
+      } catch (InstantiationException e) {
+        LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
+        return new H1SeekableInputStream(stream);
+      } catch (IllegalAccessException e) {
+        LOG.warn("Could not instantiate H2SeekableInputStream, falling back to byte array reads", e);
+        return new H1SeekableInputStream(stream);
+      } catch (InvocationTargetException e) {
+        throw new ParquetDecodingException(
+            "Could not instantiate H2SeekableInputStream", e.getTargetException());
+      }
+    } else {
+      return new H1SeekableInputStream(stream);
+    }
+  }
+
+  private static Class<?> getReadableClass() {
+    try {
+      return Class.forName("org.apache.hadoop.fs.ByteBufferReadable");
+    } catch (ClassNotFoundException e) {
+      return null;
+    } catch (NoClassDefFoundError e) {
+      return null;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private static Class<SeekableInputStream> getH2SeekableClass() {
+    try {
+      return (Class<SeekableInputStream>) Class.forName(
+          "org.apache.parquet.hadoop.util.H2SeekableInputStream");
+    } catch (ClassNotFoundException e) {
+      return null;
+    } catch (NoClassDefFoundError e) {
+      return null;
+    }
+  }
+
+  private static Constructor<SeekableInputStream> getH2SeekableConstructor() {
+    Class<SeekableInputStream> h2SeekableClass = getH2SeekableClass();
+    if (h2SeekableClass != null) {
+      try {
+        return h2SeekableClass.getConstructor(FSDataInputStream.class);
+      } catch (NoSuchMethodException e) {
+        return null;
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
new file mode 100644
index 0000000..a112288
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockInputStream.java
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+class MockInputStream extends ByteArrayInputStream
+    implements Seekable, PositionedReadable {
+  static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+
+  private int[] lengths;
+  private int current = 0;
+  MockInputStream(int... actualReadLengths) {
+    super(TEST_ARRAY);
+    this.lengths = actualReadLengths;
+  }
+
+  @Override
+  public synchronized int read(byte[] b, int off, int len) {
+    if (current < lengths.length) {
+      if (len <= lengths[current]) {
+        // when len == lengths[current], the next read will by 0 bytes
+        int bytesRead = super.read(b, off, len);
+        lengths[current] -= bytesRead;
+        return bytesRead;
+      } else {
+        int bytesRead = super.read(b, off, lengths[current]);
+        current += 1;
+        return bytesRead;
+      }
+    } else {
+      return super.read(b, off, len);
+    }
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    seek(position);
+    return read(buffer, offset, length);
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
+    throw new UnsupportedOperationException("Not actually supported.");
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    throw new UnsupportedOperationException("Not actually supported.");
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    this.pos = (int) pos;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return this.pos;
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    seek(targetPos);
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
new file mode 100644
index 0000000..9e4e2a9
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop1ByteBufferReads.java
@@ -0,0 +1,761 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+
+import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
+
+public class TestHadoop1ByteBufferReads {
+
+  private static final ThreadLocal<byte[]> TEMP = new ThreadLocal<byte[]>() {
+    @Override
+    protected byte[] initialValue() {
+      return new byte[8192];
+    }
+  };
+
+  @Test
+  public void testHeapRead() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(-1, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapSmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(5);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(5, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(5, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+  }
+
+  @Test
+  public void testHeapSmallReads() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(2, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapPosition() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.position(10);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
+
+    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(8, len);
+    Assert.assertEquals(18, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(20, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(-1, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.limit(8);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testHeapPositionAndLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.position(5);
+    readBuffer.limit(13);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+    int len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(12, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(13, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = H1SeekableInputStream.readHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(0, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectRead() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(-1, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(5);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(5, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(5, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 5), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallReads() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(2, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectPosition() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+    readBuffer.position(10);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(8));
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(8, len);
+    Assert.assertEquals(18, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(20, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(-1, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(20);
+    readBuffer.limit(8);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(0, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectPositionAndLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+    readBuffer.position(5);
+    readBuffer.limit(13);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(12, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(13, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(0, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallTempBufferSmallReads() throws Exception {
+    byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(2, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(5, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(3, len);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(2, len);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(-1, len);
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectSmallTempBufferWithPositionAndLimit() throws Exception {
+    byte[] temp = new byte[2]; // this will cause readDirectBuffer to loop
+
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+    readBuffer.position(5);
+    readBuffer.limit(13);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(7));
+
+    int len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(7, len);
+    Assert.assertEquals(12, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(1, len);
+    Assert.assertEquals(13, readBuffer.position());
+    Assert.assertEquals(13, readBuffer.limit());
+
+    len = H1SeekableInputStream.readDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(0, len);
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullySmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(8);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyLargeBuffer() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    TestUtils.assertThrows("Should throw EOFException",
+        EOFException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+            return null;
+          }
+        });
+
+    Assert.assertEquals(0, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+  }
+
+  @Test
+  public void testHeapReadFullyJustRight() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    // reads all of the bytes available without EOFException
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    // trying to read 0 more bytes doesn't result in EOFException
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullySmallReads() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyPosition() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.position(3);
+    readBuffer.mark();
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.limit(7);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyPositionAndLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H1SeekableInputStream.readFullyHeapBuffer(hadoopStream, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyLargeBuffer() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    TestUtils.assertThrows("Should throw EOFException",
+        EOFException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+            return null;
+          }
+        });
+
+    // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+    // several read operations that will read up to the end of the input. This
+    // is a correct value because the bytes in the buffer are valid. This
+    // behavior can't be implemented for the heap buffer without using the read
+    // method instead of the readFully method on the underlying
+    // FSDataInputStream.
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+  }
+
+  @Test
+  public void testDirectReadFullyJustRight() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+
+    // reads all of the bytes available without EOFException
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    // trying to read 0 more bytes doesn't result in EOFException
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallReads() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyPosition() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.mark();
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.limit(7);
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyPositionAndLimit() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, TEMP.get());
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallTempBufferWithPositionAndLimit() throws Exception {
+    byte[] temp = new byte[2]; // this will cause readFully to loop
+
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    final FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H1SeekableInputStream.readFullyDirectBuffer(hadoopStream, readBuffer, temp);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
new file mode 100644
index 0000000..86b903c
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop2ByteBufferReads.java
@@ -0,0 +1,405 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.TestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+
+import static org.apache.parquet.hadoop.util.MockInputStream.TEST_ARRAY;
+
+public class TestHadoop2ByteBufferReads {
+
+  /**
+   * This mimics ByteBuffer reads from streams in Hadoop 2
+   */
+  private static class MockBufferReader implements H2SeekableInputStream.Reader {
+    private final FSDataInputStream stream;
+
+    public MockBufferReader(FSDataInputStream stream) {
+      this.stream = stream;
+    }
+
+    @Override
+    public int read(ByteBuffer buf) throws IOException {
+      // this is inefficient, but simple for correctness tests of
+      // readFully(ByteBuffer)
+      byte[] temp = new byte[buf.remaining()];
+      int bytesRead = stream.read(temp, 0, temp.length);
+      if (bytesRead > 0) {
+        buf.put(temp, 0, bytesRead);
+      }
+      return bytesRead;
+    }
+  }
+
+  @Test
+  public void testHeapReadFullySmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(8);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyLargeBuffer() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocate(20);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+    final MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    TestUtils.assertThrows("Should throw EOFException",
+        EOFException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            H2SeekableInputStream.readFully(reader, readBuffer);
+            return null;
+          }
+        });
+
+    // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+    // several read operations that will read up to the end of the input. This
+    // is a correct value because the bytes in the buffer are valid. This
+    // behavior can't be implemented for the heap buffer without using the read
+    // method instead of the readFully method on the underlying
+    // FSDataInputStream.
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+  }
+
+  @Test
+  public void testHeapReadFullyJustRight() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    // reads all of the bytes available without EOFException
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    // trying to read 0 more bytes doesn't result in EOFException
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullySmallReads() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(10);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyPosition() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.position(3);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.limit(7);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testHeapReadFullyPositionAndLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocate(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallBuffer() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(8, readBuffer.position());
+    Assert.assertEquals(8, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyLargeBuffer() throws Exception {
+    final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+    final MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    TestUtils.assertThrows("Should throw EOFException",
+        EOFException.class, new Callable() {
+          @Override
+          public Object call() throws Exception {
+            H2SeekableInputStream.readFully(reader, readBuffer);
+            return null;
+          }
+        });
+
+    // NOTE: This behavior differs from readFullyHeapBuffer because direct uses
+    // several read operations that will read up to the end of the input. This
+    // is a correct value because the bytes in the buffer are valid. This
+    // behavior can't be implemented for the heap buffer without using the read
+    // method instead of the readFully method on the underlying
+    // FSDataInputStream.
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(20, readBuffer.limit());
+  }
+
+  @Test
+  public void testDirectReadFullyJustRight() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream());
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    // reads all of the bytes available without EOFException
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    // trying to read 0 more bytes doesn't result in EOFException
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullySmallReads() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyPosition() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.limit(7);
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.flip();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY), readBuffer);
+  }
+
+  @Test
+  public void testDirectReadFullyPositionAndLimit() throws Exception {
+    ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
+    readBuffer.position(3);
+    readBuffer.limit(7);
+    readBuffer.mark();
+
+    FSDataInputStream hadoopStream = new FSDataInputStream(new MockInputStream(2, 3, 3));
+    MockBufferReader reader = new MockBufferReader(hadoopStream);
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(7, readBuffer.position());
+    Assert.assertEquals(7, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
+
+    readBuffer.position(7);
+    readBuffer.limit(10);
+    H2SeekableInputStream.readFully(reader, readBuffer);
+    Assert.assertEquals(10, readBuffer.position());
+    Assert.assertEquals(10, readBuffer.limit());
+
+    readBuffer.reset();
+    Assert.assertEquals("Buffer contents should match",
+        ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/898f3d0f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2d6d7d2..ca34309 100644
--- a/pom.xml
+++ b/pom.xml
@@ -69,7 +69,8 @@
     <jackson.version>1.9.11</jackson.version>
     <jackson.package>org.codehaus.jackson</jackson.package>
     <shade.prefix>shaded.parquet</shade.prefix>
-    <hadoop.version>1.1.0</hadoop.version>
+    <hadoop.version>2.3.0</hadoop.version>
+    <hadoop1.version>1.1.0</hadoop1.version>
     <cascading.version>2.5.3</cascading.version>
     <cascading3.version>3.0.3</cascading3.version>
     <parquet.format.version>2.3.1</parquet.format.version>
@@ -80,7 +81,7 @@
     <scala.binary.version>2.10</scala.binary.version>
     <scala.maven.test.skip>false</scala.maven.test.skip>
     <pig.version>0.14.0</pig.version>
-    <pig.classifier/>
+    <pig.classifier>h2</pig.classifier>
     <thrift.version>0.7.0</thrift.version>
     <fastutil.version>6.5.7</fastutil.version>
     <semver.api.version>0.9.33</semver.api.version>
@@ -509,19 +510,18 @@
     </profile>
 
     <profile>
-      <id>hadoop-2</id>
+      <id>hadoop-1</id>
       <activation>
         <property>
           <name>hadoop.profile</name>
-          <value>hadoop2</value>
+          <value>hadoop1</value>
         </property>
       </activation>
       <properties>
         <!-- test hadoop-1 with the same jars that were produced for default profile -->
         <maven.main.skip>true</maven.main.skip>
-        <hadoop.version>2.3.0</hadoop.version>
-        <pig.version>0.14.0</pig.version>
-        <pig.classifier>h2</pig.classifier>
+        <hadoop.version>${hadoop1.version}</hadoop.version>
+        <pig.classifier/>
       </properties>
     </profile>
     <profile>


Mime
View raw message