parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject [1/4] parquet-mr git commit: PARQUET-77: ByteBuffer use in read and write paths
Date Wed, 04 Nov 2015 17:57:33 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 5a45ae3b1 -> 6b605a4ea


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 0a0b316..2eab54a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -42,6 +42,7 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.bytes.ByteBufferAllocator;
 
 class ColumnChunkPageWriteStore implements PageWriteStore {
   private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
@@ -65,10 +66,14 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     private Set<Encoding> encodings = new HashSet<Encoding>();
 
     private Statistics totalStatistics;
+    private final ByteBufferAllocator allocator;
 
-    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) {
+    private ColumnChunkPageWriter(ColumnDescriptor path,
+                                  BytesCompressor compressor,
+                                  ByteBufferAllocator allocator) {
       this.path = path;
       this.compressor = compressor;
+      this.allocator = allocator;
       this.buf = new ConcatenatingByteArrayCollector();
       this.totalStatistics = getStatsBasedOnType(this.path.getType());
     }
@@ -84,14 +89,14 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       if (uncompressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
             "Cannot write page larger than Integer.MAX_VALUE bytes: " +
-            uncompressedSize);
+                uncompressedSize);
       }
       BytesInput compressedBytes = compressor.compress(bytes);
       long compressedSize = compressedBytes.size();
       if (compressedSize > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
             "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
-            + compressedSize);
+                + compressedSize);
       }
       tempOutputStream.reset();
       parquetMetadataConverter.writeDataPageHeader(
@@ -151,10 +156,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       // we only allocate one buffer to copy into instead of multiple.
       buf.collect(
           BytesInput.concat(
-            BytesInput.from(tempOutputStream),
-            repetitionLevels,
-            definitionLevels,
-            compressedData)
+              BytesInput.from(tempOutputStream),
+              repetitionLevels,
+              definitionLevels,
+              compressedData)
       );
       encodings.add(dataEncoding);
     }
@@ -163,7 +168,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       if (size > Integer.MAX_VALUE) {
         throw new ParquetEncodingException(
             "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
-            size);
+                size);
       }
       return (int)size;
     }
@@ -186,10 +191,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
             String.format(
                 "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
                 buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
-            + (dictionaryPage != null ? String.format(
-                    ", dic { %,d entries, %,dB raw, %,dB comp}",
-                    dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
-                    : ""));
+                + (dictionaryPage != null ? String.format(
+                ", dic { %,d entries, %,dB raw, %,dB comp}",
+                dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+                : ""));
       }
       encodings.clear();
       pageCount = 0;
@@ -215,15 +220,16 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     public String memUsageString(String prefix) {
       return buf.memUsageString(prefix + " ColumnChunkPageWriter");
     }
+
   }
 
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
   private final MessageType schema;
 
-  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator) {
     this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, pageSize));
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, allocator));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
new file mode 100644
index 0000000..bb711da
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet.hadoop;
+
+
+
+import java.lang.reflect.Method;
+import java.lang.reflect.InvocationTargetException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.pool.BasePoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.xerial.snappy.Snappy;
+
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.Log;
+import org.apache.parquet.ParquetRuntimeException;
+import org.apache.parquet.Preconditions;
+
+/**
+ * Factory to produce compressors and decompressors that operate on java
+ * direct memory, without requiring a copy into heap memory (where possible).
+ */
+class DirectCodecFactory extends CodecFactory implements AutoCloseable {
+  private static final Log LOG = Log.getLog(DirectCodecFactory.class);
+
+  private final ByteBufferAllocator allocator;
+
+  // Any of these can be null depending on the version of hadoop on the classpath
+  private static final Class<?> DIRECT_DECOMPRESSION_CODEC_CLASS;
+  private static final Method DECOMPRESS_METHOD;
+  private static final Method CREATE_DIRECT_DECOMPRESSOR_METHOD;
+
+  static {
+    Class<?> tempClass = null;
+    Method tempCreateMethod = null;
+    Method tempDecompressMethod = null;
+    try {
+      tempClass = Class.forName("org.apache.hadoop.io.compress.DirectDecompressionCodec");
+      tempCreateMethod = tempClass.getMethod("createDirectDecompressor");
+      tempDecompressMethod = tempClass.getMethod("decompress", ByteBuffer.class, ByteBuffer.class);
+    } catch (ClassNotFoundException e) {
+      // do nothing, the class will just be assigned null
+    } catch (NoSuchMethodException e) {
+      // do nothing, the method will just be assigned null
+    }
+    DIRECT_DECOMPRESSION_CODEC_CLASS = tempClass;
+    CREATE_DIRECT_DECOMPRESSOR_METHOD = tempCreateMethod;
+    DECOMPRESS_METHOD = tempDecompressMethod;
+  }
+
+  /**
+   * See docs on CodecFactory#createDirectCodecFactory which is how this class is
+   * exposed publicly and is just a pass-through factory method for this constructor
+   * to hide the rest of this class from public access.
+   */
+  DirectCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+    super(config, pageSize);
+    Preconditions.checkNotNull(allocator, "allocator");
+    Preconditions.checkState(allocator.isDirect(),
+        "A %s requires a direct buffer allocator be provided.",
+        getClass().getSimpleName());
+    this.allocator = allocator;
+  }
+
+  private ByteBuffer ensure(ByteBuffer buffer, int size) {
+    if (buffer == null) {
+      buffer = allocator.allocate(size);
+    } else if (buffer.capacity() >= size) {
+      buffer.clear();
+    } else {
+      release(buffer);
+      buffer = allocator.allocate(size);
+    }
+    return buffer;
+  }
+
+  ByteBuffer release(ByteBuffer buffer) {
+    if (buffer != null) {
+      allocator.release(buffer);
+    }
+    return null;
+  }
+
+  @Override
+  protected BytesCompressor createCompressor(final CompressionCodecName codecName) {
+
+    CompressionCodec codec = getCodec(codecName);
+    if (codec == null) {
+      return new NoopCompressor();
+    } else if (codecName == CompressionCodecName.SNAPPY) {
+      // avoid using the default Snappy codec since it allocates direct buffers at awkward spots.
+      return new SnappyCompressor();
+    } else {
+      // todo: create class similar to the SnappyCompressor for zlib and exclude it as
+      // snappy is above since it also generates allocateDirect calls.
+      return new HeapBytesCompressor(codecName);
+    }
+  }
+
+  @Override
+  protected BytesDecompressor createDecompressor(final CompressionCodecName codecName) {
+    CompressionCodec codec = getCodec(codecName);
+    if (codec == null) {
+      return new NoopDecompressor();
+    } else if (codecName == CompressionCodecName.SNAPPY ) {
+      return new SnappyDecompressor();
+    } else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) {
+      return new FullDirectDecompressor(codecName);
+    } else {
+      return new IndirectDecompressor(codec);
+    }
+  }
+
+  public void close() {
+    release();
+  }
+
+  /**
+   * Wrapper around legacy hadoop compressors that do not implement a direct memory
+   * based version of the decompression algorithm.
+   */
+  public class IndirectDecompressor extends BytesDecompressor {
+    private final Decompressor decompressor;
+
+    public IndirectDecompressor(CompressionCodec codec) {
+      this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      decompressor.reset();
+      byte[] inputBytes = bytes.toByteArray();
+      decompressor.setInput(inputBytes, 0, inputBytes.length);
+      byte[] output = new byte[uncompressedSize];
+      decompressor.decompress(output, 0, uncompressedSize);
+      return BytesInput.from(output);
+    }
+
+    @Override
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+        throws IOException {
+
+      decompressor.reset();
+      byte[] inputBytes = new byte[compressedSize];
+      input.position(0);
+      input.get(inputBytes);
+      decompressor.setInput(inputBytes, 0, inputBytes.length);
+      byte[] outputBytes = new byte[uncompressedSize];
+      decompressor.decompress(outputBytes, 0, uncompressedSize);
+      output.clear();
+      output.put(outputBytes);
+    }
+
+    @Override
+    protected void release() {
+      DirectCodecPool.INSTANCE.returnDecompressor(decompressor);
+    }
+  }
+
+  /**
+   * Wrapper around new Hadoop compressors that implement a direct memory
+   * based version of a particular decompression algorithm. To maintain
+   * compatibility with Hadoop 1.x these classes that implement
+   * {@link org.apache.hadoop.io.compress.DirectDecompressionCodec}
+   * are currently retrieved and have their decompression method invoked
+   * with reflection.
+   */
+  public class FullDirectDecompressor extends BytesDecompressor {
+    private final Object decompressor;
+    private HeapBytesDecompressor extraDecompressor;
+    public FullDirectDecompressor(CompressionCodecName codecName){
+      CompressionCodec codec = getCodec(codecName);
+      this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor();
+      this.extraDecompressor = new HeapBytesDecompressor(codecName);
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException {
+    	return extraDecompressor.decompress(compressedBytes, uncompressedSize);
+    }
+
+    @Override
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+        throws IOException {
+      output.clear();
+      try {
+        DECOMPRESS_METHOD.invoke(decompressor, (ByteBuffer) input.limit(compressedSize), (ByteBuffer) output.limit(uncompressedSize));
+      } catch (IllegalAccessException e) {
+        throw new DirectCodecPool.ParquetCompressionCodecException(e);
+      } catch (InvocationTargetException e) {
+        throw new DirectCodecPool.ParquetCompressionCodecException(e);
+      }
+      output.position(uncompressedSize);
+    }
+
+    @Override
+    protected void release() {
+      DirectCodecPool.INSTANCE.returnDirectDecompressor(decompressor);
+      extraDecompressor.release();
+    }
+
+  }
+
+  public class NoopDecompressor extends BytesDecompressor {
+
+    @Override
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
+        throws IOException {
+      Preconditions.checkArgument(compressedSize == uncompressedSize,
+          "Non-compressed data did not have matching compressed and uncompressed sizes.");
+      output.clear();
+      output.put((ByteBuffer) input.duplicate().position(0).limit(compressedSize));
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      return bytes;
+    }
+
+    @Override
+    protected void release() {}
+
+  }
+
+  public class SnappyDecompressor extends BytesDecompressor {
+
+    private HeapBytesDecompressor extraDecompressor;
+    public SnappyDecompressor() {
+      this.extraDecompressor = new HeapBytesDecompressor(CompressionCodecName.SNAPPY);
+    }
+
+    @Override
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      return extraDecompressor.decompress(bytes, uncompressedSize);
+    }
+
+    @Override
+    public void decompress(ByteBuffer src, int compressedSize, ByteBuffer dst, int uncompressedSize) throws IOException {
+      dst.clear();
+      int size = Snappy.uncompress(src, dst);
+      dst.limit(size);
+    }
+
+    @Override
+    protected void release() {}
+  }
+
+  public class SnappyCompressor extends BytesCompressor {
+
+    // TODO - this outgoing buffer might be better off not being shared, this seems to
+    // only work because of an extra copy currently happening where this interface is
+    // be consumed
+    private ByteBuffer incoming;
+    private ByteBuffer outgoing;
+
+    /**
+     * Compress a given buffer of bytes
+     * @param bytes
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      int maxOutputSize = Snappy.maxCompressedLength((int) bytes.size());
+      ByteBuffer bufferIn = bytes.toByteBuffer();
+      outgoing = ensure(outgoing, maxOutputSize);
+      final int size;
+      if (bufferIn.isDirect()) {
+        size = Snappy.compress(bufferIn, outgoing);
+      } else {
+        // Snappy library requires buffers be direct
+        this.incoming = ensure(this.incoming, (int) bytes.size());
+        this.incoming.put(bufferIn);
+        this.incoming.flip();
+        size = Snappy.compress(this.incoming, outgoing);
+      }
+
+      return BytesInput.from(outgoing, 0, (int) size);
+    }
+
+    @Override
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.SNAPPY;
+    }
+
+    @Override
+    protected void release() {
+      outgoing = DirectCodecFactory.this.release(outgoing);
+      incoming = DirectCodecFactory.this.release(incoming);
+    }
+
+  }
+
+  public static class NoopCompressor extends BytesCompressor {
+
+    public NoopCompressor() {}
+
+    @Override
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      return bytes;
+    }
+
+    @Override
+    public CompressionCodecName getCodecName() {
+      return CompressionCodecName.UNCOMPRESSED;
+    }
+
+    @Override
+    protected void release() {}
+  }
+
+  static class DirectCodecPool {
+
+    public static final DirectCodecPool INSTANCE = new DirectCodecPool();
+
+    private final Map<CompressionCodec, CodecPool> codecs =
+        Collections.synchronizedMap(new HashMap<CompressionCodec, CodecPool>());
+    private final Map<Class<?>, GenericObjectPool> directDePools = Collections
+        .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>());
+    private final Map<Class<?>, GenericObjectPool> dePools = Collections
+        .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>());
+    private final Map<Class<?>, GenericObjectPool> cPools = Collections
+        .synchronizedMap(new HashMap<Class<?>, GenericObjectPool>());
+
+    private DirectCodecPool() {}
+
+    public class CodecPool {
+      private final GenericObjectPool compressorPool;
+      private final GenericObjectPool decompressorPool;
+      private final GenericObjectPool directDecompressorPool;
+      private final boolean supportDirectDecompressor;
+      private static final String BYTE_BUF_IMPL_NOT_FOUND_MSG =
+          "Unable to find ByteBuffer based %s for codec %s, will use a byte array based implementation instead.";
+
+      private CodecPool(final CompressionCodec codec){
+        try {
+          boolean supportDirectDecompressor = codec.getClass() == DIRECT_DECOMPRESSION_CODEC_CLASS;
+          compressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+            public Object makeObject() throws Exception {
+              return codec.createCompressor();
+            }
+          }, Integer.MAX_VALUE);
+
+          Object com = compressorPool.borrowObject();
+          if (com != null) {
+            cPools.put(com.getClass(), compressorPool);
+            compressorPool.returnObject(com);
+          } else {
+            if (Log.DEBUG) {
+              LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "compressor", codec.getClass().getName()));
+            }
+          }
+
+          decompressorPool = new GenericObjectPool(new BasePoolableObjectFactory() {
+            public Object makeObject() throws Exception {
+              return codec.createDecompressor();
+            }
+          }, Integer.MAX_VALUE);
+
+          Object decom = decompressorPool.borrowObject();
+          if (decom != null) {
+            dePools.put(decom.getClass(), decompressorPool);
+            decompressorPool.returnObject(decom);
+          } else {
+            if (Log.DEBUG) {
+              LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "decompressor" + codec.getClass().getName()));
+            }
+          }
+
+          if (supportDirectDecompressor) {
+            directDecompressorPool = new GenericObjectPool(
+                new BasePoolableObjectFactory() {
+                  public Object makeObject() throws Exception {
+                    return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(DIRECT_DECOMPRESSION_CODEC_CLASS);
+                  }
+                }, Integer.MAX_VALUE);
+
+            Object ddecom = directDecompressorPool.borrowObject();
+            if (ddecom != null) {
+              directDePools.put(ddecom.getClass(), directDecompressorPool);
+              directDecompressorPool.returnObject(ddecom);
+
+            } else {
+              supportDirectDecompressor = false;
+              if (Log.DEBUG) {
+                LOG.debug(String.format(BYTE_BUF_IMPL_NOT_FOUND_MSG, "compressor" + codec.getClass().getName()));
+              }
+            }
+
+          } else {
+            directDecompressorPool = null;
+          }
+
+          this.supportDirectDecompressor = supportDirectDecompressor;
+        } catch (Exception e) {
+          throw new ParquetCompressionCodecException("Error creating compression codec pool.", e);
+        }
+      }
+
+      public Object borrowDirectDecompressor(){
+        Preconditions.checkArgument(supportDirectDecompressor, "Tried to get a direct Decompressor from a non-direct codec.");
+        try {
+          return directDecompressorPool.borrowObject();
+        } catch (Exception e) {
+          throw new ParquetCompressionCodecException(e);
+        }
+      }
+
+      public boolean supportsDirectDecompression() {
+        return supportDirectDecompressor;
+      }
+
+      public Decompressor borrowDecompressor(){
+        return borrow(decompressorPool);
+      }
+
+      public Compressor borrowCompressor(){
+        return borrow(compressorPool);
+      }
+    }
+
+    public CodecPool codec(CompressionCodec codec){
+      CodecPool pools = codecs.get(codec);
+      if(pools == null){
+        synchronized(this){
+          pools = codecs.get(codec);
+          if(pools == null){
+            pools = new CodecPool(codec);
+            codecs.put(codec, pools);
+          }
+        }
+      }
+      return pools;
+    }
+
+    private void returnToPool(Object obj, Map<Class<?>, GenericObjectPool> pools) {
+      try {
+        GenericObjectPool pool = pools.get(obj.getClass());
+        if (pool == null) {
+          throw new IllegalStateException("Received unexpected compressor or decompressor, " +
+              "cannot be returned to any available pool: " + obj.getClass().getSimpleName());
+        }
+        pool.returnObject(obj);
+      } catch (Exception e) {
+        throw new ParquetCompressionCodecException(e);
+      }
+    }
+
+    /**
+     * Borrow an object from a pool.
+     *
+     * @param pool - the pull to borrow from, must not be null
+     * @return - an object from the pool
+     */
+    @SuppressWarnings("unchecked")
+    public <T> T borrow(GenericObjectPool pool) {
+      try {
+        return (T) pool.borrowObject();
+      } catch (Exception e) {
+        throw new ParquetCompressionCodecException(e);
+      }
+
+    }
+
+    public void returnCompressor(Compressor compressor) {
+      returnToPool(compressor, cPools);
+    }
+
+    public void returnDecompressor(Decompressor decompressor) {
+      returnToPool(decompressor, dePools);
+    }
+
+    public void returnDirectDecompressor(Object decompressor) {
+      returnToPool(decompressor, directDePools);
+    }
+
+    public static class ParquetCompressionCodecException extends ParquetRuntimeException {
+
+      public ParquetCompressionCodecException() {
+        super();
+      }
+
+      public ParquetCompressionCodecException(String message, Throwable cause) {
+        super(message, cause);
+      }
+
+      public ParquetCompressionCodecException(String message) {
+        super(message);
+      }
+
+      public ParquetCompressionCodecException(Throwable cause) {
+
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index ab9cb3e..87b23a2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
@@ -86,7 +87,8 @@ class InternalParquetRecordWriter<T> {
       int dictionaryPageSize,
       boolean enableDictionary,
       boolean validating,
-      WriterVersion writerVersion) {
+      WriterVersion writerVersion,
+      ByteBufferAllocator allocator) {
     this.parquetFileWriter = parquetFileWriter;
     this.writeSupport = checkNotNull(writeSupport, "writeSupport");
     this.schema = schema;
@@ -97,16 +99,17 @@ class InternalParquetRecordWriter<T> {
     this.pageSize = pageSize;
     this.compressor = compressor;
     this.validating = validating;
-    this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary);
+    this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary, allocator);
     initStore();
   }
 
   private void initStore() {
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, pageSize);
+    pageStore = new ColumnChunkPageWriteStore(compressor, schema, parquetProperties.getAllocator());
     columnStore = parquetProperties.newColumnWriteStore(
         schema,
         pageStore,
-        pageSize);
+        pageSize,
+        parquetProperties.getAllocator());
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     this.recordConsumer = columnIO.getRecordWriter(columnStore);
     writeSupport.prepareForWrite(recordConsumer);
@@ -150,7 +153,9 @@ class InternalParquetRecordWriter<T> {
             max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
             recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
             );
-        if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
+        if (DEBUG) {
+          LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck));
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index f43e692..c54b2b2 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,10 +27,10 @@ import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
 
-import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.SequenceInputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -53,6 +53,11 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -185,7 +190,9 @@ public class ParquetFileReader implements Closeable {
 
     if (toRead.size() > 0) {
       // read the footers of the files that did not have a summary file
-      if (Log.INFO) LOG.info("reading another " + toRead.size() + " footers");
+      if (Log.INFO) {
+        LOG.info("reading another " + toRead.size() + " footers");
+      }
       result.addAll(readAllFootersInParallel(configuration, toRead, skipRowGroups));
     }
 
@@ -297,7 +304,7 @@ public class ParquetFileReader implements Closeable {
    * Read the footers of all the files under that path (recursively)
    * using summary files if possible
    * @param configuration the configuration to access the FS
-   * @param fileStatus the root dir
+   * @param pathStatus the root dir
    * @return all the footers
    * @throws IOException
    */
@@ -342,7 +349,9 @@ public class ParquetFileReader implements Closeable {
       if (Log.INFO) LOG.info("reading summary file: " + commonMetaDataFile);
       return readFooter(configuration, commonMetaDataFile, filter(skipRowGroups));
     } else if (fileSystem.exists(metadataFile)) {
-      if (Log.INFO) LOG.info("reading summary file: " + metadataFile);
+      if (Log.INFO) {
+        LOG.info("reading summary file: " + metadataFile);
+      }
       return readFooter(configuration, metadataFile, filter(skipRowGroups));
     } else {
       return null;
@@ -416,13 +425,17 @@ public class ParquetFileReader implements Closeable {
     FSDataInputStream f = fileSystem.open(file.getPath());
     try {
       long l = file.getLen();
-      if (Log.DEBUG) LOG.debug("File length " + l);
+      if (Log.DEBUG) {
+        LOG.debug("File length " + l);
+      }
       int FOOTER_LENGTH_SIZE = 4;
       if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
         throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)");
       }
       long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length;
-      if (Log.DEBUG) LOG.debug("reading footer index at " + footerLengthIndex);
+      if (Log.DEBUG) {
+        LOG.debug("reading footer index at " + footerLengthIndex);
+      }
 
       f.seek(footerLengthIndex);
       int footerLength = readIntLittleEndian(f);
@@ -432,7 +445,9 @@ public class ParquetFileReader implements Closeable {
         throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
       }
       long footerIndex = footerLengthIndex - footerLength;
-      if (Log.DEBUG) LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
+      if (Log.DEBUG) {
+        LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
+      }
       if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
         throw new RuntimeException("corrupted file: the footer index is not within the file");
       }
@@ -450,6 +465,7 @@ public class ParquetFileReader implements Closeable {
   private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
   private final FileMetaData fileMetaData;
   private final String createdBy;
+  private final ByteBufferAllocator allocator;
 
   private int currentBlock = 0;
 
@@ -480,7 +496,10 @@ public class ParquetFileReader implements Closeable {
     for (ColumnDescriptor col : columns) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
-    this.codecFactory = new CodecFactory(configuration);
+    // the page size parameter isn't meaningful when only using
+    // the codec factory to get decompressors
+    this.codecFactory = new CodecFactory(configuration, 0);
+    this.allocator = new HeapByteBufferAllocator();
   }
 
 
@@ -540,7 +559,7 @@ public class ParquetFileReader implements Closeable {
    * @author Julien Le Dem
    *
    */
-  private class Chunk extends ByteArrayInputStream {
+  private class Chunk extends ByteBufferInputStream {
 
     private final ChunkDescriptor descriptor;
 
@@ -550,10 +569,9 @@ public class ParquetFileReader implements Closeable {
      * @param data contains the chunk data at offset
      * @param offset where the chunk starts in offset
      */
-    public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) {
-      super(data);
+    public Chunk(ChunkDescriptor descriptor, ByteBuffer data, int offset) {
+      super(data, offset, descriptor.size);
       this.descriptor = descriptor;
-      this.pos = offset;
     }
 
     protected PageHeader readPageHeader() throws IOException {
@@ -626,7 +644,9 @@ public class ParquetFileReader implements Closeable {
             valuesCountReadSoFar += dataHeaderV2.getNum_values();
             break;
           default:
-            if (DEBUG) LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize);
+            if (DEBUG) {
+              LOG.debug("skipping page of type " + pageHeader.getType() + " of size " + compressedPageSize);
+            }
             this.skip(compressedPageSize);
             break;
         }
@@ -647,7 +667,7 @@ public class ParquetFileReader implements Closeable {
      * @return the current position in the chunk
      */
     public int pos() {
-      return this.pos;
+      return this.byteBuf.position();
     }
 
     /**
@@ -656,8 +676,9 @@ public class ParquetFileReader implements Closeable {
      * @throws IOException
      */
     public BytesInput readAsBytesInput(int size) throws IOException {
-      final BytesInput r = BytesInput.from(this.buf, this.pos, size);
-      this.pos += size;
+      int pos = this.byteBuf.position();
+      final BytesInput r = BytesInput.from(this.byteBuf, pos, size);
+      this.byteBuf.position(pos + size);
       return r;
     }
 
@@ -675,18 +696,18 @@ public class ParquetFileReader implements Closeable {
 
     /**
      * @param descriptor the descriptor of the chunk
-     * @param data contains the data of the chunk at offset
+     * @param byteBuf contains the data of the chunk at offset
      * @param offset where the chunk starts in data
      * @param f the file stream positioned at the end of this chunk
      */
-    private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) {
-      super(descriptor, data, offset);
+    private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, FSDataInputStream f) {
+      super(descriptor, byteBuf, offset);
       this.f = f;
     }
 
     protected PageHeader readPageHeader() throws IOException {
       PageHeader pageHeader;
-      int initialPos = this.pos;
+      int initialPos = pos();
       try {
         pageHeader = Util.readPageHeader(this);
       } catch (IOException e) {
@@ -695,7 +716,7 @@ public class ParquetFileReader implements Closeable {
         // to allow reading older files (using dictionary) we need this.
         // usually 13 to 19 bytes are missing
         // if the last page is smaller than this, the page header itself is truncated in the buffer.
-        this.pos = initialPos; // resetting the buffer to the position before we got the error
+        this.byteBuf.rewind(); // resetting the buffer to the position before we got the error
         LOG.info("completing the column chunk to read the page header");
         pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
       }
@@ -703,12 +724,12 @@ public class ParquetFileReader implements Closeable {
     }
 
     public BytesInput readAsBytesInput(int size) throws IOException {
-      if (pos + size > count) {
+      if (pos() + size > initPos + count) {
         // this is to workaround a bug where the compressedLength
         // of the chunk is missing the size of the header of the dictionary
         // to allow reading older files (using dictionary) we need this.
         // usually 13 to 19 bytes are missing
-        int l1 = count - pos;
+        int l1 = initPos + count - pos();
         int l2 = size - l1;
         LOG.info("completed the column chunk with " + l2 + " bytes");
         return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
@@ -784,18 +805,18 @@ public class ParquetFileReader implements Closeable {
     public List<Chunk> readAll(FSDataInputStream f) throws IOException {
       List<Chunk> result = new ArrayList<Chunk>(chunks.size());
       f.seek(offset);
-      byte[] chunksBytes = new byte[length];
-      f.readFully(chunksBytes);
+      ByteBuffer chunksByteBuffer = allocator.allocate(length);
+      CompatibilityUtil.getBuf(f, chunksByteBuffer, length);
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
       int currentChunkOffset = 0;
       for (int i = 0; i < chunks.size(); i++) {
         ChunkDescriptor descriptor = chunks.get(i);
         if (i < chunks.size() - 1) {
-          result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset));
+          result.add(new Chunk(descriptor, chunksByteBuffer, currentChunkOffset));
         } else {
           // because of a bug, the last chunk might be larger than descriptor.size
-          result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f));
+          result.add(new WorkaroundChunk(descriptor, chunksByteBuffer, currentChunkOffset, f));
         }
         currentChunkOffset += descriptor.size;
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 664ee9d..8683a18 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -73,8 +73,9 @@ public class ParquetFileWriter {
   private static ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
 
   public static final String PARQUET_METADATA_FILE = "_metadata";
+  public static final String MAGIC_STR = "PAR1";
+  public static final byte[] MAGIC = MAGIC_STR.getBytes(Charset.forName("ASCII"));
   public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
-  public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
   public static final int CURRENT_VERSION = 1;
 
   // need to supply a buffer size when setting block size. this is the default

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index ad6c034..562bffc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -341,7 +341,6 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
         throws IOException, InterruptedException {
     final WriteSupport<T> writeSupport = getWriteSupport(conf);
 
-    CodecFactory codecFactory = new CodecFactory(conf);
     long blockSize = getLongBlockSize(conf);
     if (INFO) LOG.info("Parquet block size to " + blockSize);
     int pageSize = getPageSize(conf);
@@ -357,6 +356,8 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     int maxPaddingSize = getMaxPaddingSize(conf);
     if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");
 
+    CodecFactory codecFactory = new CodecFactory(conf, pageSize);
+
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(
         conf, init.getSchema(), file, Mode.CREATE, blockSize, maxPaddingSize);
@@ -379,7 +380,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
         init.getSchema(),
         init.getExtraMetaData(),
         blockSize, pageSize,
-        codecFactory.getCompressor(codec, pageSize),
+        codecFactory.getCompressor(codec),
         dictionaryPageSize,
         enableDictionary,
         validating,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
index 2449192..eefb257 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,9 +20,11 @@ package org.apache.parquet.hadoop;
 
 import java.io.IOException;
 import java.util.Map;
+
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -70,7 +72,7 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
       WriterVersion writerVersion) {
     internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
         extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
-        validating, writerVersion);
+      validating, writerVersion, new HeapByteBufferAllocator());
   }
 
   /**
@@ -98,8 +100,8 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
       WriterVersion writerVersion,
       MemoryManager memoryManager) {
     internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
-        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
-        validating, writerVersion);
+      extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
+      validating, writerVersion, new HeapByteBufferAllocator());
     this.memoryManager = checkNotNull(memoryManager, "memoryManager");
     memoryManager.addWriter(internalWriter, blockSize);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index e3b7953..e2521fb 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -29,6 +29,7 @@ import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 /**
  * Write records to a Parquet file.
@@ -267,8 +268,8 @@ public class ParquetWriter<T> implements Closeable {
         conf, schema, file, mode, blockSize, maxPaddingSize);
     fileWriter.start();
 
-    CodecFactory codecFactory = new CodecFactory(conf);
-    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName, 0);
+    CodecFactory codecFactory = new CodecFactory(conf, pageSize);
+    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName);
     this.writer = new InternalParquetRecordWriter<T>(
         fileWriter,
         writeSupport,
@@ -280,7 +281,8 @@ public class ParquetWriter<T> implements Closeable {
         dictionaryPageSize,
         enableDictionary,
         validating,
-        writerVersion);
+        writerVersion,
+        new HeapByteBufferAllocator());
   }
 
   public void write(T object) throws IOException {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index 8631267..66e3b81 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -147,4 +147,5 @@ public class SnappyDecompressor implements Decompressor {
   public void setDictionary(byte[] b, int off, int len) {
     // No-op		
   }
-}
+
+} //class SnappyDecompressor

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
new file mode 100644
index 0000000..bacf222
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.ShouldNeverHappenException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+public class CompatibilityUtil {
+
+  // Will be set to true if the implementation of FSDataInputSteam supports
+  // the 2.x APIs, in particular reading using a provided ByteBuffer
+  private static boolean useV21;
+  public static final V21FileAPI fileAPI;
+
+  private static class V21FileAPI {
+    private final Method PROVIDE_BUF_READ_METHOD;
+    private final Class<?> FSDataInputStreamCls;
+
+    private V21FileAPI() throws ReflectiveOperationException {
+      final String PACKAGE = "org.apache.hadoop";
+      FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream");
+      PROVIDE_BUF_READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBuffer.class);
+    }
+  }
+  
+  static {
+    // Test to see if a class from the Hadoop 2.x API is available
+    boolean v21 = true;
+    try {
+      Class.forName("org.apache.hadoop.io.compress.DirectDecompressor");
+    } catch (ClassNotFoundException cnfe) {
+      v21 = false;
+    }
+
+    useV21 = v21;
+    try {
+      if (v21) {
+        fileAPI = new V21FileAPI();
+      } else {
+        fileAPI = null;
+      }
+
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalArgumentException("Error finding appropriate interfaces using reflection.", e);
+    }
+  }
+
+  private static Object invoke(Method method, String errorMsg, Object instance, Object... args) {
+    try {
+      return method.invoke(instance, args);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(errorMsg, e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException(errorMsg, e);
+    }
+  }
+
+  public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException {
+    int res;
+    if (useV21) {
+      try {
+        res = (Integer) fileAPI.PROVIDE_BUF_READ_METHOD.invoke(f, readBuf);
+      } catch (InvocationTargetException e) {
+        if (e.getCause() instanceof UnsupportedOperationException) {
+          // the FSDataInputStream docs say specifically that implementations
+          // can choose to throw UnsupportedOperationException, so this should
+          // be a reasonable check to make to see if the interface is
+          // present but not implemented and we should be falling back
+          useV21 = false;
+          return getBuf(f, readBuf, maxSize);
+        } else if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        } else {
+          // To handle any cases where a Runtime exception occurs and provide
+          // some additional context information. A stacktrace would just give
+          // a line number, this at least tells them we were using the version
+          // of the read method designed for using a ByteBuffer.
+          throw new IOException("Error reading out of an FSDataInputStream " +
+              "using the Hadoop 2 ByteBuffer based read method.", e.getCause());
+        }
+      } catch (IllegalAccessException e) {
+        // This method is public because it is defined in an interface,
+        // there should be no problems accessing it
+        throw new ShouldNeverHappenException(e);
+      }
+    } else {
+      byte[] buf = new byte[maxSize];
+      res = f.read(buf);
+      readBuf.put(buf, 0, res);
+    }
+    return res;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index 2c644b6..87574cd 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -61,6 +61,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 import org.apache.parquet.schema.Types;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 public class TestColumnChunkPageWriteStore {
 
@@ -101,7 +102,7 @@ public class TestColumnChunkPageWriteStore {
       writer.start();
       writer.startBlock(rowCount);
       {
-        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , initialSize);
+        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator());
         PageWriter pageWriter = store.getPageWriter(col);
         pageWriter.writePageV2(
             rowCount, nullCount, valueCount,
@@ -158,8 +159,10 @@ public class TestColumnChunkPageWriteStore {
     int fakeCount = 3;
     BinaryStatistics fakeStats = new BinaryStatistics();
 
+    // TODO - look back at this, an allocator was being passed here in the ByteBuffer changes
+    // see comment at this constructor
     ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
-        compressor(UNCOMPRESSED), schema, initialSize);
+        compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator());
 
     for (ColumnDescriptor col : schema.getColumns()) {
       PageWriter pageWriter = store.getPageWriter(col);
@@ -176,6 +179,6 @@ public class TestColumnChunkPageWriteStore {
   }
 
   private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) {
-    return new CodecFactory(conf).getCompressor(codec, pageSize);
+    return new CodecFactory(conf, pageSize).getCompressor(codec);
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
new file mode 100644
index 0000000..caf2ed6
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+public class TestDirectCodecFactory {
+
+  private static enum Decompression {
+    ON_HEAP, OFF_HEAP, OFF_HEAP_BYTES_INPUT
+  }
+
+  private final int pageSize = 64 * 1024;
+
+  private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) {
+    ByteBuffer rawBuf = null;
+    ByteBuffer outBuf = null;
+    ByteBufferAllocator allocator = null;
+    try {
+      allocator = new DirectByteBufferAllocator();
+      final CodecFactory codecFactory = CodecFactory.createDirectCodecFactory(new Configuration(), allocator, pageSize);
+      rawBuf = allocator.allocate(size);
+      final byte[] rawArr = new byte[size];
+      outBuf = allocator.allocate(size * 2);
+      final Random r = new Random();
+      final byte[] random = new byte[1024];
+      int pos = 0;
+      while (pos < size) {
+        r.nextBytes(random);
+        rawBuf.put(random);
+        System.arraycopy(random, 0, rawArr, pos, random.length);
+        pos += random.length;
+      }
+      rawBuf.flip();
+
+      final DirectCodecFactory.BytesCompressor c = codecFactory.getCompressor(codec);
+      final CodecFactory.BytesDecompressor d = codecFactory.getDecompressor(codec);
+
+      final BytesInput compressed;
+      if (useOnHeapCompression) {
+        compressed = c.compress(BytesInput.from(rawArr));
+      } else {
+        compressed = c.compress(BytesInput.from(rawBuf, 0, rawBuf.remaining()));
+      }
+
+      switch (decomp) {
+        case OFF_HEAP: {
+          final ByteBuffer buf = compressed.toByteBuffer();
+          final ByteBuffer b = allocator.allocate(buf.capacity());
+          try {
+            b.put(buf);
+            b.flip();
+            d.decompress(b, (int) compressed.size(), outBuf, size);
+            for (int i = 0; i < size; i++) {
+              Assert.assertTrue("Data didn't match at " + i, outBuf.get(i) == rawBuf.get(i));
+            }
+          } finally {
+            allocator.release(b);
+          }
+          break;
+        }
+
+        case OFF_HEAP_BYTES_INPUT: {
+          final ByteBuffer buf = compressed.toByteBuffer();
+          final ByteBuffer b = allocator.allocate(buf.capacity());
+          try {
+            b.put(buf);
+            b.flip();
+            final BytesInput input = d.decompress(BytesInput.from(b, 0, b.capacity()), size);
+            Assert.assertArrayEquals(
+                String.format("While testing codec %s", codec),
+                input.toByteArray(), rawArr);
+          } finally {
+            allocator.release(b);
+          }
+          break;
+        }
+        case ON_HEAP: {
+          final byte[] buf = compressed.toByteArray();
+          final BytesInput input = d.decompress(BytesInput.from(buf), size);
+          Assert.assertArrayEquals(input.toByteArray(), rawArr);
+          break;
+        }
+      }
+    } catch (Exception e) {
+      final String msg = String.format(
+          "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d",
+          codec.name(),
+          useOnHeapCompression, decomp.name(), size);
+      System.out.println(msg);
+      throw new RuntimeException(msg, e);
+    } finally {
+      if (rawBuf != null) {
+        allocator.release(rawBuf);
+      }
+      if (outBuf != null) {
+        allocator.release(rawBuf);
+      }
+    }
+  }
+
+  @Test
+  public void createDirectFactoryWithHeapAllocatorFails() {
+    String errorMsg = "Test failed, creation of a direct codec factory should have failed when passed a non-direct allocator.";
+    try {
+      CodecFactory.createDirectCodecFactory(new Configuration(), new HeapByteBufferAllocator(), 0);
+      throw new RuntimeException(errorMsg);
+    } catch (IllegalStateException ex) {
+      // indicates successful completion of the test
+      Assert.assertTrue("Missing expected error message.",
+          ex.getMessage()
+          .contains("A DirectCodecFactory requires a direct buffer allocator be provided.")
+      );
+    } catch (Exception ex) {
+      throw new RuntimeException(errorMsg + " Failed with the wrong error.");
+    }
+  }
+
+  @Test
+  public void compressionCodecs() throws Exception {
+    final int[] sizes = { 4 * 1024, 1 * 1024 * 1024 };
+    final boolean[] comp = { true, false };
+
+    for (final int size : sizes) {
+      for (final boolean useOnHeapComp : comp) {
+        for (final Decompression decomp : Decompression.values()) {
+          for (final CompressionCodecName codec : CompressionCodecName.values()) {
+            if (codec == CompressionCodecName.LZO) {
+              // not installed as gpl.
+              continue;
+            }
+            test(size, codec, useOnHeapComp, decomp);
+          }
+        }
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
index 5e1f5af..c050922 100644
--- a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
+++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Level;
 
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.NonSpillableDataBag;
@@ -59,7 +60,7 @@ public class TupleConsumerPerfTest {
     MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema));
 
     MemPageStore memPageStore = new MemPageStore(0);
-    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
     write(memPageStore, columns, schema, pigSchema);
     columns.flush();
     read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
index f5f3ff1..f954e4c 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import thrift.test.OneOfEach;
 
 import org.apache.thrift.TBase;
@@ -148,7 +149,8 @@ public class TestParquetReadProtocol {
     final MessageType schema = schemaConverter.convert(thriftClass);
     LOG.info(schema);
     final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
-    final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false, WriterVersion.PARQUET_1_0);
+    final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false,
+        WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
     final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
     final StructType thriftType = schemaConverter.toStructType(thriftClass);
     ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index dc27f4c..14feb9c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,7 +88,7 @@
     <scala.binary.version>2.10</scala.binary.version>
     <scala.maven.test.skip>false</scala.maven.test.skip>
     <pig.version>0.11.1</pig.version>
-    <pig.classifier />
+    <pig.classifier/>
     <thrift.version>0.7.0</thrift.version>
     <fastutil.version>6.5.7</fastutil.version>
     <semver.api.version>0.9.33</semver.api.version>
@@ -225,6 +225,7 @@
                      <exclude>org/apache/parquet/filter2/**</exclude>
                      <exclude>org/apache/parquet/column/**</exclude>
                      <exclude>org/apache/parquet/hadoop/ParquetInputSplit</exclude>
+                     <exclude>org/apache/parquet/hadoop/CodecFactory**</exclude>
                      <exclude>shaded/**</exclude> <!-- shaded by parquet -->
                      <!-- temporary exclusions for false-positives -->
                      <exclude>org/apache/parquet/Version</exclude>


Mime
View raw message