parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [12/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:09 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
new file mode 100644
index 0000000..6253c99
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -0,0 +1,272 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Write records to a Parquet file.
+ */
+public class ParquetWriter<T> implements Closeable {
+
+  public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
+  public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+  public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
+      CompressionCodecName.UNCOMPRESSED;
+  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+  public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
+  public static final WriterVersion DEFAULT_WRITER_VERSION =
+      WriterVersion.PARQUET_1_0;
+
+  private final InternalParquetRecordWriter<T> writer;
+
+  /**
+   * Create a new ParquetWriter.
+   * (with dictionary encoding enabled and validation off)
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @throws IOException
+   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, boolean, boolean)
+   */
+  public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+        DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold (both data and dictionary)
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @throws IOException
+   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean)
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      boolean enableDictionary,
+      boolean validating) throws IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @param dictionaryPageSize the page size threshold for the dictionary pages
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @throws IOException
+   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, WriterVersion)
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating) throws IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+        dictionaryPageSize, enableDictionary, validating,
+        DEFAULT_WRITER_VERSION);
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
+   * configuration from the classpath.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @param dictionaryPageSize the page size threshold for the dictionary pages
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
+   * @throws IOException
+   * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int, int, int, boolean, boolean, WriterVersion, Configuration)
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion) throws IOException {
+    this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
+  }
+
+  /**
+   * Create a new ParquetWriter.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @param dictionaryPageSize the page size threshold for the dictionary pages
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
+   * @param conf Hadoop configuration to use while accessing the filesystem
+   * @throws IOException
+   */
+  public ParquetWriter(
+      Path file,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion,
+      Configuration conf) throws IOException {
+    this(file, ParquetFileWriter.Mode.CREATE, writeSupport,
+        compressionCodecName, blockSize, pageSize, dictionaryPageSize,
+        enableDictionary, validating, writerVersion, conf);
+  }
+  /**
+   * Create a new ParquetWriter.
+   *
+   * @param file the file to create
+   * @param mode file creation mode
+   * @param writeSupport the implementation to write a record to a RecordConsumer
+   * @param compressionCodecName the compression codec to use
+   * @param blockSize the block size threshold
+   * @param pageSize the page size threshold
+   * @param dictionaryPageSize the page size threshold for the dictionary pages
+   * @param enableDictionary to turn dictionary encoding on
+   * @param validating to turn on validation using the schema
+   * @param writerVersion version of parquetWriter from {@link ParquetProperties.WriterVersion}
+   * @param conf Hadoop configuration to use while accessing the filesystem
+   * @throws IOException
+   */
+  public ParquetWriter(
+      Path file,
+      ParquetFileWriter.Mode mode,
+      WriteSupport<T> writeSupport,
+      CompressionCodecName compressionCodecName,
+      int blockSize,
+      int pageSize,
+      int dictionaryPageSize,
+      boolean enableDictionary,
+      boolean validating,
+      WriterVersion writerVersion,
+      Configuration conf) throws IOException {
+
+    WriteSupport.WriteContext writeContext = writeSupport.init(conf);
+    MessageType schema = writeContext.getSchema();
+
+    ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file,
+        mode);
+    fileWriter.start();
+
+    CodecFactory codecFactory = new CodecFactory(conf);
+    CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName, 0);
+    this.writer = new InternalParquetRecordWriter<T>(
+        fileWriter,
+        writeSupport,
+        schema,
+        writeContext.getExtraMetaData(),
+        blockSize,
+        pageSize,
+        compressor,
+        dictionaryPageSize,
+        enableDictionary,
+        validating,
+        writerVersion);
+  }
+
+  /**
+   * Create a new ParquetWriter.  The default block size is 50 MB.The default
+   * page size is 1 MB.  Default compression is no compression. Dictionary encoding is disabled.
+   *
+   * @param file the file to create
+   * @param writeSupport the implementation to write a record to a RecordConsumer
+   * @throws IOException
+   */
+  public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
+    this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+  }
+
+  public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport) throws IOException {
+    this(file,
+        writeSupport,
+        DEFAULT_COMPRESSION_CODEC_NAME,
+        DEFAULT_BLOCK_SIZE,
+        DEFAULT_PAGE_SIZE,
+        DEFAULT_PAGE_SIZE,
+        DEFAULT_IS_DICTIONARY_ENABLED,
+        DEFAULT_IS_VALIDATING_ENABLED,
+        DEFAULT_WRITER_VERSION,
+        conf);
+  }
+
+  public void write(T object) throws IOException {
+    try {
+      writer.write(object);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      writer.close();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java
new file mode 100644
index 0000000..5a3c6f5
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/PrintFooter.java
@@ -0,0 +1,271 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Utility to print footer information
+ * @author Julien Le Dem
+ *
+ */
+public class PrintFooter {
+
+  public static void main(String[] args) throws Exception {
+    if (args.length != 1) {
+      System.err.println("usage PrintFooter <path>");
+      return;
+    }
+    Path path = new Path(new URI(args[0]));
+    final Configuration configuration = new Configuration();
+
+    final FileSystem fs = path.getFileSystem(configuration);
+    FileStatus fileStatus = fs.getFileStatus(path);
+    Path summary = new Path(fileStatus.getPath(), PARQUET_METADATA_FILE);
+    if (fileStatus.isDir() && fs.exists(summary)) {
+      System.out.println("reading summary file");
+      FileStatus summaryStatus = fs.getFileStatus(summary);
+      List<Footer> readSummaryFile = ParquetFileReader.readSummaryFile(configuration, summaryStatus);
+      for (Footer footer : readSummaryFile) {
+        add(footer.getParquetMetadata());
+      }
+    } else {
+      List<FileStatus> statuses;
+      if (fileStatus.isDir()) {
+        System.out.println("listing files in " + fileStatus.getPath());
+        statuses = Arrays.asList(fs.listStatus(fileStatus.getPath(), HiddenFileFilter.INSTANCE));
+      } else {
+        statuses = new ArrayList<FileStatus>();
+        statuses.add(fileStatus);
+      }
+      System.out.println("opening " + statuses.size() + " files");
+      int i = 0;
+      ExecutorService threadPool = Executors.newFixedThreadPool(5);
+      try {
+        long t0 = System.currentTimeMillis();
+        Deque<Future<ParquetMetadata>> footers = new LinkedBlockingDeque<Future<ParquetMetadata>>();
+        for (final FileStatus currentFile : statuses) {
+          footers.add(threadPool.submit(new Callable<ParquetMetadata>() {
+            @Override
+            public ParquetMetadata call() throws Exception {
+              try {
+                ParquetMetadata footer = ParquetFileReader.readFooter(configuration, currentFile, NO_FILTER);
+                return footer;
+              } catch (Exception e) {
+                throw new ParquetDecodingException("could not read footer", e);
+              }
+            }
+          }));
+        }
+        int previousPercent = 0;
+        int n = 60;
+        System.out.print("0% [");
+        for (int j = 0; j < n; j++) {
+          System.out.print(" ");
+
+        }
+        System.out.print("] 100%");
+        for (int j = 0; j < n + 6; j++) {
+          System.out.print('\b');
+        }
+        while (!footers.isEmpty()) {
+          Future<ParquetMetadata> futureFooter = footers.removeFirst();
+          if (!futureFooter.isDone()) {
+            footers.addLast(futureFooter);
+            continue;
+          }
+          ParquetMetadata footer = futureFooter.get();
+          int currentPercent = (++i * n / statuses.size());
+          while (currentPercent > previousPercent) {
+            System.out.print("*");
+            previousPercent ++;
+          }
+          add(footer);
+        }
+        System.out.println("");
+        long t1 = System.currentTimeMillis();
+        System.out.println("read all footers in " + (t1 - t0) + " ms");
+      } finally {
+        threadPool.shutdownNow();
+      }
+    }
+    Set<Entry<ColumnDescriptor, ColStats>> entries = stats.entrySet();
+    long total = 0;
+    long totalUnc = 0;
+    for (Entry<ColumnDescriptor, ColStats> entry : entries) {
+      ColStats colStats = entry.getValue();
+      total += colStats.allStats.total;
+      totalUnc += colStats.uncStats.total;
+    }
+
+    for (Entry<ColumnDescriptor, ColStats> entry : entries) {
+      ColStats colStats = entry.getValue();
+      System.out.println(entry.getKey() +" " + percent(colStats.allStats.total, total) + "% of all space " + colStats);
+    }
+
+    System.out.println("number of blocks: " + blockCount);
+    System.out.println("total data size: " + humanReadable(total) + " (raw " + humanReadable(totalUnc) + ")");
+    System.out.println("total record: " + humanReadable(recordCount));
+    System.out.println("average block size: " + humanReadable(total/blockCount) + " (raw " + humanReadable(totalUnc/blockCount) + ")");
+    System.out.println("average record count: " + humanReadable(recordCount/blockCount));
+  }
+
+  private static void add(ParquetMetadata footer) {
+    for (BlockMetaData blockMetaData : footer.getBlocks()) {
+      ++ blockCount;
+      MessageType schema = footer.getFileMetaData().getSchema();
+      recordCount += blockMetaData.getRowCount();
+      List<ColumnChunkMetaData> columns = blockMetaData.getColumns();
+      for (ColumnChunkMetaData columnMetaData : columns) {
+        ColumnDescriptor desc = schema.getColumnDescription(columnMetaData.getPath().toArray());
+        add(
+            desc,
+            columnMetaData.getValueCount(),
+            columnMetaData.getTotalSize(),
+            columnMetaData.getTotalUncompressedSize(),
+            columnMetaData.getEncodings(),
+            columnMetaData.getStatistics());
+      }
+    }
+  }
+
+  private static void printTotalString(String message, long total, long totalUnc) {
+    System.out.println("total "+message+": " + humanReadable(total) + " (raw "+humanReadable(totalUnc)+" saved "+percentComp(totalUnc, total)+"%)");
+  }
+
+  private static float percentComp(long raw, long compressed) {
+    return percent(raw - compressed, raw);
+  }
+
+  private static float percent(long numerator, long denominator) {
+    return ((float)((numerator)*1000/denominator))/10;
+  }
+
+  private static String humanReadable(long size) {
+    if (size < 1000) {
+      return String.valueOf(size);
+    }
+    long currentSize = size;
+    long previousSize = size * 1000;
+    int count = 0;
+    String[] unit = {"", "K", "M", "G", "T", "P"};
+    while (currentSize >= 1000) {
+      previousSize = currentSize;
+      currentSize = currentSize / 1000;
+      ++ count;
+    }
+    return ((float)previousSize/1000) + unit[count];
+  }
+
+  private static Map<ColumnDescriptor, ColStats> stats = new LinkedHashMap<ColumnDescriptor, ColStats>();
+  private static int blockCount = 0;
+  private static long recordCount = 0;
+
+  private static class Stats {
+    long min = Long.MAX_VALUE;
+    long max = Long.MIN_VALUE;
+    long total = 0;
+
+    public void add(long  length) {
+      min = Math.min(length, min);
+      max = Math.max(length, max);
+      total += length;
+    }
+
+    public String toString(int blocks) {
+      return
+          "min: " + humanReadable(min) +
+          " max: " + humanReadable(max) +
+          " average: " + humanReadable(total/blocks) +
+          " total: " + humanReadable(total);
+    }
+  }
+
+  private static class ColStats {
+
+    Stats valueCountStats = new Stats();
+    Stats allStats = new Stats();
+    Stats uncStats = new Stats();
+    Set<Encoding> encodings = new TreeSet<Encoding>();
+    Statistics colValuesStats = null;
+    int blocks = 0;
+
+    public void add(long valueCount, long size, long uncSize, Collection<Encoding> encodings, Statistics colValuesStats) {
+      ++blocks;
+      valueCountStats.add(valueCount);
+      allStats.add(size);
+      uncStats.add(uncSize);
+      this.encodings.addAll(encodings);
+      this.colValuesStats = colValuesStats;
+    }
+
+    @Override
+    public String toString() {
+      long raw = uncStats.total;
+      long compressed = allStats.total;
+      return encodings + " " + allStats.toString(blocks) + " (raw data: " + humanReadable(raw) + (raw == 0 ? "" : " saving " + (raw - compressed)*100/raw + "%") + ")\n"
+      + "  values: "+valueCountStats.toString(blocks) + "\n"
+      + "  uncompressed: "+uncStats.toString(blocks) + "\n"
+      + "  column values statistics: " + colValuesStats.toString();
+    }
+
+  }
+
+  private static void add(ColumnDescriptor desc, long valueCount, long size, long uncSize, Collection<Encoding> encodings, Statistics colValuesStats) {
+    ColStats colStats = stats.get(desc);
+    if (colStats == null) {
+      colStats = new ColStats();
+      stats.put(desc, colStats);
+    }
+    colStats.add(valueCount, size, uncSize, encodings, colValuesStats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
new file mode 100644
index 0000000..3ddaa77
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingReadSupport.java
@@ -0,0 +1,62 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.api;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Helps composing read supports
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class DelegatingReadSupport<T> extends ReadSupport<T> {
+
+  private final ReadSupport<T> delegate;
+
+  public DelegatingReadSupport(ReadSupport<T> delegate) {
+    super();
+    this.delegate = delegate;
+  }
+
+  @Override
+  public ReadSupport.ReadContext init(InitContext context) {
+    return delegate.init(context);
+  }
+
+  @Override
+  public RecordMaterializer<T> prepareForRead(
+      Configuration configuration,
+      Map<String, String> keyValueMetaData,
+      MessageType fileSchema,
+      ReadSupport.ReadContext readContext) {
+    return delegate.prepareForRead(configuration, keyValueMetaData, fileSchema, readContext);
+  }
+
+  @Override
+  public String toString() {
+    return this.getClass().getName() + "(" + delegate.toString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
new file mode 100644
index 0000000..207bb1a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/DelegatingWriteSupport.java
@@ -0,0 +1,66 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.api;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.io.api.RecordConsumer;
+
+/**
+ *
+ * Helps composing write supports
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class DelegatingWriteSupport<T> extends WriteSupport<T> {
+
+  private final WriteSupport<T> delegate;
+
+  public DelegatingWriteSupport(WriteSupport<T> delegate) {
+    super();
+    this.delegate = delegate;
+  }
+
+  @Override
+  public WriteSupport.WriteContext init(Configuration configuration) {
+    return delegate.init(configuration);
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    delegate.prepareForWrite(recordConsumer);
+  }
+
+  @Override
+  public void write(T record) {
+    delegate.write(record);
+  }
+
+  @Override
+  public WriteSupport.FinalizedWriteContext finalizeWrite() {
+    return delegate.finalizeWrite();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getName() + "(" + delegate.toString() + ")";
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
new file mode 100644
index 0000000..222898e
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/InitContext.java
@@ -0,0 +1,102 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.api;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.schema.MessageType;
+
+/**
+ *
+ * Context passed to ReadSupport when initializing for read
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class InitContext {
+
+  private final Map<String,Set<String>> keyValueMetadata;
+  private Map<String,String> mergedKeyValueMetadata;
+  private final Configuration configuration;
+  private final MessageType fileSchema;
+
+  /**
+   * @param configuration the hadoop configuration
+   * @param keyValueMetadata extra metadata from file footers
+   * @param fileSchema the merged schema from the files
+   */
+  public InitContext(
+      Configuration configuration,
+      Map<String, Set<String>> keyValueMetadata,
+      MessageType fileSchema) {
+    super();
+    this.keyValueMetadata = keyValueMetadata;
+    this.configuration = configuration;
+    this.fileSchema = fileSchema;
+  }
+
+  /**
+   * If there is a conflicting value when reading from multiple files,
+   * an exception will be thrown
+   * @return the merged key values metadata form the file footers
+   */
+  @Deprecated
+  public Map<String, String> getMergedKeyValueMetaData() {
+    if (mergedKeyValueMetadata == null) {
+      Map<String, String> mergedKeyValues = new HashMap<String, String>();
+      for (Entry<String, Set<String>> entry : keyValueMetadata.entrySet()) {
+        if (entry.getValue().size() > 1) {
+          throw new RuntimeException("could not merge metadata: key " + entry.getKey() + " has conflicting values: " + entry.getValue());
+        }
+        mergedKeyValues.put(entry.getKey(), entry.getValue().iterator().next());
+      }
+      mergedKeyValueMetadata = mergedKeyValues;
+    }
+    return mergedKeyValueMetadata;
+  }
+
+  /**
+   * @return the configuration for this job
+   */
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  /**
+   * this is the union of all the schemas when reading multiple files.
+   * @return the schema of the files being read
+   */
+  public MessageType getFileSchema() {
+    return fileSchema;
+  }
+
+  /**
+   * each key is associated with the list of distinct values found in footers
+   * @return the merged metadata from the footer of the file
+   */
+  public Map<String, Set<String>> getKeyValueMetadata() {
+    return keyValueMetadata;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
new file mode 100644
index 0000000..6d8c1fd
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/ReadSupport.java
@@ -0,0 +1,150 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.api;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+/**
+ * Abstraction used by the {@link org.apache.parquet.hadoop.ParquetInputFormat} to materialize records
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the materialized record
+ */
+abstract public class ReadSupport<T> {
+
+  /**
+   * configuration key for a parquet read projection schema
+   */
+	public static final String PARQUET_READ_SCHEMA = "parquet.read.schema";
+
+  /**
+   * attempts to validate and construct a {@link MessageType} from a read projection schema
+   *
+   * @param fileMessageType         the typed schema of the source
+   * @param partialReadSchemaString the requested projection schema
+   * @return the typed schema that should be used to read
+   */
+  public static MessageType getSchemaForRead(MessageType fileMessageType, String partialReadSchemaString) {
+    if (partialReadSchemaString == null)
+      return fileMessageType;
+    MessageType requestedMessageType = MessageTypeParser.parseMessageType(partialReadSchemaString);
+    return getSchemaForRead(fileMessageType, requestedMessageType);
+  }
+
+  public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) {
+    fileMessageType.checkContains(projectedMessageType);
+    return projectedMessageType;
+  }
+
+  /**
+   * called in {@link org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)} in the front end
+   *
+   * @param configuration    the job configuration
+   * @param keyValueMetaData the app specific metadata from the file
+   * @param fileSchema       the schema of the file
+   * @return the readContext that defines how to read the file
+   *
+   * @deprecated override {@link ReadSupport#init(InitContext)} instead
+   */
+  @Deprecated
+  public ReadContext init(
+          Configuration configuration,
+          Map<String, String> keyValueMetaData,
+          MessageType fileSchema) {
+    throw new UnsupportedOperationException("Override init(InitContext)");
+  }
+
+  /**
+   * called in {@link org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce.JobContext)} in the front end
+   *
+   * @param context the initialisation context
+   * @return the readContext that defines how to read the file
+   */
+  public ReadContext init(InitContext context) {
+    return init(context.getConfiguration(), context.getMergedKeyValueMetaData(), context.getFileSchema());
+  }
+
+  /**
+   * called in {@link org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)} in the back end
+   * the returned RecordMaterializer will materialize the records and add them to the destination
+   *
+   * @param configuration    the job configuration
+   * @param keyValueMetaData the app specific metadata from the file
+   * @param fileSchema       the schema of the file
+   * @param readContext      returned by the init method
+   * @return the recordMaterializer that will materialize the records
+   */
+  abstract public RecordMaterializer<T> prepareForRead(
+          Configuration configuration,
+          Map<String, String> keyValueMetaData,
+          MessageType fileSchema,
+          ReadContext readContext);
+
+  /**
+   * information to read the file
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static final class ReadContext {
+    private final MessageType requestedSchema;
+    private final Map<String, String> readSupportMetadata;
+
+    /**
+     * @param requestedSchema the schema requested by the user. Can not be null.
+     */
+    public ReadContext(MessageType requestedSchema) {
+      this(requestedSchema, null);
+    }
+
+    /**
+     * @param requestedSchema the schema requested by the user. Can not be null.
+     * @param readSupportMetadata metadata specific to the ReadSupport implementation. Will be available in the prepareForRead phase.
+     */
+    public ReadContext(MessageType requestedSchema, Map<String, String> readSupportMetadata) {
+      super();
+      if (requestedSchema == null) {
+        throw new NullPointerException("requestedSchema");
+      }
+      this.requestedSchema = requestedSchema;
+      this.readSupportMetadata = readSupportMetadata;
+    }
+
+    /**
+     * @return the schema of the file
+     */
+    public MessageType getRequestedSchema() {
+      return requestedSchema;
+    }
+
+    /**
+     * @return metadata specific to the ReadSupport implementation
+     */
+    public Map<String, String> getReadSupportMetadata() {
+      return readSupportMetadata;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
new file mode 100644
index 0000000..91c37c3
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java
@@ -0,0 +1,131 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.api;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+
+
+/**
+ * Abstraction to use with {@link org.apache.parquet.hadoop.ParquetOutputFormat} to convert incoming records
+ *
+ * @author Julien Le Dem
+ *
+ * @param <T> the type of the incoming records
+ */
+abstract public class WriteSupport<T> {
+
+  /**
+   * information to be persisted in the file
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static final class WriteContext {
+    private final MessageType schema;
+    private final Map<String, String> extraMetaData;
+
+    /**
+     * @param schema the schema of the data
+     * @param extraMetaData application specific metadata to add in the file
+     */
+    public WriteContext(MessageType schema, Map<String, String> extraMetaData) {
+      super();
+      this.schema = checkNotNull(schema, "schema");
+      this.extraMetaData = Collections.unmodifiableMap(checkNotNull(extraMetaData, "extraMetaData"));
+    }
+    /**
+     * @return the schema of the file
+     */
+    public MessageType getSchema() {
+      return schema;
+    }
+    /**
+     * @return application specific metadata
+     */
+    public Map<String, String> getExtraMetaData() {
+      return extraMetaData;
+    }
+
+  }
+
+  /**
+   * Information to be added in the file once all the records have been written
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static final class FinalizedWriteContext {
+    private final Map<String, String> extraMetaData;
+    // this class exists to facilitate evolution of the API
+    // we can add more fields later
+
+    /**
+     * @param extraMetaData application specific metadata to add in the file
+     */
+    public FinalizedWriteContext(Map<String, String> extraMetaData) {
+      super();
+      this.extraMetaData = Collections.unmodifiableMap(checkNotNull(extraMetaData, "extraMetaData"));
+    }
+
+    /**
+     * @return application specific metadata
+     */
+    public Map<String, String> getExtraMetaData() {
+      return extraMetaData;
+    }
+
+  }
+
+  /**
+   * called first in the task
+   * @param configuration the job's configuration
+   * @return the information needed to write the file
+   */
+  public abstract WriteContext init(Configuration configuration);
+
+  /**
+   * This will be called once per row group
+   * @param recordConsumer the recordConsumer to write to
+   */
+  public abstract void prepareForWrite(RecordConsumer recordConsumer);
+
+  /**
+   * called once per record
+   * @param record one record to write to the previously provided record consumer
+   */
+  public abstract void write(T record);
+
+  /**
+   * called once in the end after the last record was written
+   * @return information to be added in the file
+   */
+  public FinalizedWriteContext finalizeWrite() {
+    return new FinalizedWriteContext(new HashMap<String, String>());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/package-info.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/package-info.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/package-info.java
new file mode 100644
index 0000000..c0b5885
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *
+ * <p>
+ * APIs to integrate various type systems with Parquet
+ *
+ * </p>
+ */
+package org.apache.parquet.hadoop.api;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CodecConfig.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CodecConfig.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CodecConfig.java
new file mode 100644
index 0000000..9657865
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CodecConfig.java
@@ -0,0 +1,169 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.parquet.Log;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+import static org.apache.parquet.Log.INFO;
+import static org.apache.parquet.Log.WARN;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+
+/**
+ * Template class and factory for accessing codec related configurations in different APIs(mapreduce or mapred),
+ * use {@link #from(org.apache.hadoop.mapred.JobConf)} for mapred API,
+ * use {@link #from(org.apache.hadoop.mapreduce.TaskAttemptContext)} for mapreduce API
+ *
+ * @author Tianshuo Deng
+ */
+public abstract class CodecConfig {
+  private static final Log LOG = Log.getLog(CodecConfig.class);
+
+  /**
+   * @return if a compress flag is set from hadoop
+   */
+  public abstract boolean isHadoopCompressionSet();
+
+  /**
+   * @param defaultCodec the codec to use when codec is not set in conf
+   * @return codec specified in hadoop config
+   */
+  public abstract Class getHadoopOutputCompressorClass(Class defaultCodec);
+
+  /**
+   * @return configuration of the job
+   */
+  public abstract Configuration getConfiguration();
+
+  /**
+   * use mapred api to read codec config
+   * @return  MapredCodecConfig
+   */
+  public static CodecConfig from(JobConf jobConf) {
+    return new MapredCodecConfig(jobConf);
+  }
+
+  /**
+   * use mapreduce api to read codec config
+   * @return MapreduceCodecConfig
+   */
+  public static CodecConfig from(TaskAttemptContext context) {
+    return new MapreduceCodecConfig(context);
+  }
+
+  public static boolean isParquetCompressionSet(Configuration conf) {
+    return conf.get(ParquetOutputFormat.COMPRESSION) != null;
+  }
+
+  public static CompressionCodecName getParquetCompressionCodec(Configuration configuration) {
+    return CompressionCodecName.fromConf(configuration.get(ParquetOutputFormat.COMPRESSION, UNCOMPRESSED.name()));
+  }
+
+  public CompressionCodecName getCodec() {
+    CompressionCodecName codec;
+    Configuration configuration = getConfiguration();
+    if (isParquetCompressionSet(configuration)) { // explicit parquet config
+      codec = getParquetCompressionCodec(configuration);
+    } else if (isHadoopCompressionSet()) { // from hadoop config
+      codec = getHadoopCompressionCodec();
+    } else {
+      if (INFO) LOG.info("Compression set to false");
+      codec = CompressionCodecName.UNCOMPRESSED;
+    }
+
+    if (INFO) LOG.info("Compression: " + codec.name());
+    return codec;
+  }
+
+  private CompressionCodecName getHadoopCompressionCodec() {
+    CompressionCodecName codec;
+    try {
+      // find the right codec
+      Class<?> codecClass = getHadoopOutputCompressorClass(CompressionCodecName.UNCOMPRESSED.getHadoopCompressionCodecClass());
+      if (INFO) LOG.info("Compression set through hadoop codec: " + codecClass.getName());
+      codec = CompressionCodecName.fromCompressionCodec(codecClass);
+    } catch (CompressionCodecNotSupportedException e) {
+      if (WARN)
+        LOG.warn("codec defined in hadoop config is not supported by parquet [" + e.getCodecClass().getName() + "] and will use UNCOMPRESSED", e);
+      codec = CompressionCodecName.UNCOMPRESSED;
+    } catch (IllegalArgumentException e) {
+      if (WARN) LOG.warn("codec class not found: " + e.getMessage(), e);
+      codec = CompressionCodecName.UNCOMPRESSED;
+    }
+    return codec;
+  }
+
+  /**
+   * Access codec related configurations in mapreduce API
+   */
+  private static class MapreduceCodecConfig extends CodecConfig {
+    private final TaskAttemptContext context;
+
+    public MapreduceCodecConfig(TaskAttemptContext context) {
+      this.context = context;
+    }
+
+    @Override
+    public boolean isHadoopCompressionSet() {
+      return FileOutputFormat.getCompressOutput(context);
+    }
+
+    @Override
+    public Class getHadoopOutputCompressorClass(Class defaultCodec) {
+      return FileOutputFormat.getOutputCompressorClass(context, defaultCodec);
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return ContextUtil.getConfiguration(context);
+    }
+  }
+
+  /**
+   * Access codec related configurations in mapred API
+   */
+  private static class MapredCodecConfig extends CodecConfig {
+    private final JobConf conf;
+
+    public MapredCodecConfig(JobConf conf) {
+      this.conf = conf;
+    }
+
+    @Override
+    public boolean isHadoopCompressionSet() {
+      return org.apache.hadoop.mapred.FileOutputFormat.getCompressOutput(conf);
+    }
+
+    @Override
+    public Class getHadoopOutputCompressorClass(Class defaultCodec) {
+      return org.apache.hadoop.mapred.FileOutputFormat.getOutputCompressorClass(conf, defaultCodec);
+    }
+
+    @Override
+    public Configuration getConfiguration() {
+      return conf;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
new file mode 100644
index 0000000..9657cc1
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/CompressionCodecNotSupportedException.java
@@ -0,0 +1,36 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+/**
+ * This exception will be thrown when the codec is not supported by parquet, meaning there is no
+ * matching codec defined in {@link org.apache.parquet.hadoop.metadata.CompressionCodecName}
+ */
+public class CompressionCodecNotSupportedException extends RuntimeException {
+  private final Class codecClass;
+
+  public CompressionCodecNotSupportedException(Class codecClass) {
+    super("codec not supported: " + codecClass.getName());
+    this.codecClass = codecClass;
+  }
+
+  public Class getCodecClass() {
+    return codecClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressorStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressorStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressorStream.java
new file mode 100644
index 0000000..2fb17d3
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressorStream.java
@@ -0,0 +1,50 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.CompressorStream;
+
+/**
+ * CompressorStream class that should be used instead of the default hadoop CompressorStream
+ * object. Hadoop's compressor adds blocking ontop of the compression codec. We don't want
+ * that since our Pages already solve the need to add blocking.
+ */
+public class NonBlockedCompressorStream extends CompressorStream {
+  public NonBlockedCompressorStream(OutputStream stream, Compressor compressor, int bufferSize) {
+	super(stream, compressor, bufferSize);
+  }
+  
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    // Sanity checks
+    if (compressor.finished()) {
+      throw new IOException("write beyond end of stream");
+    }
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    compressor.setInput(b, off, len);    
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
new file mode 100644
index 0000000..2740e86
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
@@ -0,0 +1,57 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DecompressorStream;
+
+/**
+ * DecompressorStream class that should be used instead of the default hadoop DecompressorStream
+ * object. Hadoop's compressor adds blocking ontop of the compression codec. We don't want
+ * that since our Pages already solve the need to add blocking.
+ */
+public class NonBlockedDecompressorStream extends DecompressorStream {
+  private boolean inputHandled;
+  
+  public NonBlockedDecompressorStream(InputStream stream, Decompressor decompressor, int bufferSize) throws IOException {
+	super(stream, decompressor, bufferSize);
+  }
+  
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+	if (!inputHandled) {
+	  // Send all the compressed input to the decompressor.
+	  while (true) {
+		int compressedBytes = getCompressedData();
+		if (compressedBytes == -1) break;
+		decompressor.setInput(buffer, 0, compressedBytes);
+	  }
+	  inputHandled = true;
+	}
+	
+	int decompressedBytes = decompressor.decompress(b, off, len);
+	if (decompressor.finished()) {
+	  decompressor.reset();
+	}
+	return decompressedBytes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCodec.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCodec.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCodec.java
new file mode 100644
index 0000000..8b5ff2a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCodec.java
@@ -0,0 +1,105 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+
+/**
+ * Snappy compression codec for Parquet.  We do not use the default hadoop
+ * one since that codec adds a blocking structure around the base snappy compression
+ * algorithm.  This is useful for hadoop to minimize the size of compression blocks
+ * for their file formats (e.g. SequenceFile) but is undesirable for Parquet since
+ * we already have the data page which provides that.
+ */
+public class SnappyCodec implements Configurable, CompressionCodec {
+  private Configuration conf;
+  // Hadoop config for how big to make intermediate buffers.
+  private final String BUFFER_SIZE_CONFIG = "io.file.buffer.size";
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Compressor createCompressor() {
+    return new SnappyCompressor();
+  }
+
+  @Override
+  public Decompressor createDecompressor() {
+    return new SnappyDecompressor();
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream stream)
+      throws IOException {
+    return createInputStream(stream, createDecompressor());
+  }
+
+  @Override
+  public CompressionInputStream createInputStream(InputStream stream,
+      Decompressor decompressor) throws IOException {
+    return new NonBlockedDecompressorStream(stream, decompressor,
+        conf.getInt(BUFFER_SIZE_CONFIG, 4*1024));
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream stream)
+      throws IOException {
+    return createOutputStream(stream, createCompressor());
+  }
+
+  @Override
+  public CompressionOutputStream createOutputStream(OutputStream stream,
+      Compressor compressor) throws IOException {
+    return new NonBlockedCompressorStream(stream, compressor, 
+        conf.getInt(BUFFER_SIZE_CONFIG, 4*1024));
+  }
+
+  @Override
+  public Class<? extends Compressor> getCompressorType() {
+    return SnappyCompressor.class;
+  }
+
+  @Override
+  public Class<? extends Decompressor> getDecompressorType() {
+    return SnappyDecompressor.class;
+  }
+
+  @Override
+  public String getDefaultExtension() {
+    return ".snappy";
+  }  
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
new file mode 100644
index 0000000..f099896
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -0,0 +1,161 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+import org.xerial.snappy.Snappy;
+
+import org.apache.parquet.Preconditions;
+
+/**
+ * This class is a wrapper around the snappy compressor. It always consumes the
+ * entire input in setInput and compresses it as one compressed block.
+ */
+public class SnappyCompressor implements Compressor {
+  // Buffer for compressed output. This buffer grows as necessary.
+  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
+
+  // Buffer for uncompressed input. This buffer grows as necessary.
+  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
+
+  private long bytesRead = 0L;
+  private long bytesWritten = 0L;
+  private boolean finishCalled = false;
+
+  /**
+   * Fills specified buffer with compressed data. Returns actual number
+   * of bytes of compressed data. A return value of 0 indicates that
+   * needsInput() should be called in order to determine if more input
+   * data is required.
+   *
+   * @param buffer   Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of compressed data.
+   */
+  @Override
+  public synchronized int compress(byte[] buffer, int off, int len) throws IOException {
+    SnappyUtil.validateBuffer(buffer, off, len);
+
+    if (needsInput()) {
+      // No buffered output bytes and no input to consume, need more input
+      return 0;
+    }
+
+    if (!outputBuffer.hasRemaining()) {
+      // There is uncompressed input, compress it now
+      int maxOutputSize = Snappy.maxCompressedLength(inputBuffer.position());
+      if (maxOutputSize > outputBuffer.capacity()) {
+        outputBuffer = ByteBuffer.allocateDirect(maxOutputSize);
+      }
+      // Reset the previous outputBuffer
+      outputBuffer.clear();
+      inputBuffer.limit(inputBuffer.position());
+      inputBuffer.position(0);
+
+      int size = Snappy.compress(inputBuffer, outputBuffer);
+      outputBuffer.limit(size);
+      inputBuffer.limit(0);
+      inputBuffer.rewind();
+    }
+
+    // Return compressed output up to 'len'
+    int numBytes = Math.min(len, outputBuffer.remaining());
+    outputBuffer.get(buffer, off, numBytes);    
+    bytesWritten += numBytes;
+    return numBytes;	    
+  }
+
+  @Override
+  public synchronized void setInput(byte[] buffer, int off, int len) {  
+    SnappyUtil.validateBuffer(buffer, off, len);
+    
+    Preconditions.checkArgument(!outputBuffer.hasRemaining(), 
+        "Output buffer should be empty. Caller must call compress()");
+
+    if (inputBuffer.capacity() - inputBuffer.position() < len) {
+      ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
+      inputBuffer.rewind();
+      tmp.put(inputBuffer);
+      inputBuffer = tmp;
+    } else {
+      inputBuffer.limit(inputBuffer.position() + len);
+    }
+
+    // Append the current bytes to the input buffer
+    inputBuffer.put(buffer, off, len);
+    bytesRead += len;
+  }
+
+  @Override
+  public void end() {
+    // No-op		
+  }
+
+  @Override
+  public void finish() {
+    finishCalled = true;
+  }
+
+  @Override
+  public synchronized boolean finished() {
+    return finishCalled && inputBuffer.position() == 0 && !outputBuffer.hasRemaining();
+  }
+
+  @Override
+  public long getBytesRead() {
+    return bytesRead;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return bytesWritten;
+  }
+
+  @Override
+  // We want to compress all the input in one go so we always need input until it is
+  // all consumed.
+  public synchronized boolean needsInput() {
+    return !finishCalled;
+  }
+
+  @Override
+  public void reinit(Configuration c) {
+    reset();		
+  }
+
+  @Override
+  public synchronized void reset() {
+    finishCalled = false;
+    bytesRead = bytesWritten = 0;
+    inputBuffer.rewind();
+    outputBuffer.rewind();
+    inputBuffer.limit(0);
+    outputBuffer.limit(0);
+  }
+
+  @Override
+  public void setDictionary(byte[] dictionary, int off, int len) {
+    // No-op		
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
new file mode 100644
index 0000000..8631267
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -0,0 +1,150 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.xerial.snappy.Snappy;
+
+import org.apache.parquet.Preconditions;
+
+public class SnappyDecompressor implements Decompressor {
+  // Buffer for uncompressed output. This buffer grows as necessary.
+  private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
+
+  // Buffer for compressed input. This buffer grows as necessary.
+  private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
+
+  private boolean finished;
+  
+  /**
+   * Fills specified buffer with uncompressed data. Returns actual number
+   * of bytes of uncompressed data. A return value of 0 indicates that
+   * {@link #needsInput()} should be called in order to determine if more
+   * input data is required.
+   *
+   * @param buffer   Buffer for the compressed data
+   * @param off Start offset of the data
+   * @param len Size of the buffer
+   * @return The actual number of bytes of uncompressed data.
+   * @throws IOException
+   */
+  @Override
+  public synchronized int decompress(byte[] buffer, int off, int len) throws IOException {
+    SnappyUtil.validateBuffer(buffer, off, len);
+	if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
+      return 0;
+    }
+    
+    if (!outputBuffer.hasRemaining()) {
+      inputBuffer.rewind();
+      Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 0.");
+      Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of 0.");
+      // There is compressed input, decompress it now.
+      int decompressedSize = Snappy.uncompressedLength(inputBuffer);
+      if (decompressedSize > outputBuffer.capacity()) {
+        outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
+      }
+
+      // Reset the previous outputBuffer (i.e. set position to 0)
+      outputBuffer.clear();
+      int size = Snappy.uncompress(inputBuffer, outputBuffer);
+      outputBuffer.limit(size);
+      // We've decompressed the entire input, reset the input now
+      inputBuffer.clear();
+      inputBuffer.limit(0);
+      finished = true;
+    }
+
+    // Return compressed output up to 'len'
+    int numBytes = Math.min(len, outputBuffer.remaining());
+    outputBuffer.get(buffer, off, numBytes);
+    return numBytes;	    
+  }
+
+  /**
+   * Sets input data for decompression.
+   * This should be called if and only if {@link #needsInput()} returns
+   * <code>true</code> indicating that more input data is required.
+   * (Both native and non-native versions of various Decompressors require
+   * that the data passed in via <code>b[]</code> remain unmodified until
+   * the caller is explicitly notified--via {@link #needsInput()}--that the
+   * buffer may be safely modified.  With this requirement, an extra
+   * buffer-copy can be avoided.)
+   *
+   * @param buffer   Input data
+   * @param off Start offset
+   * @param len Length
+   */
+  @Override
+  public synchronized void setInput(byte[] buffer, int off, int len) {
+    SnappyUtil.validateBuffer(buffer, off, len);
+
+    if (inputBuffer.capacity() - inputBuffer.position() < len) {
+      ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
+      inputBuffer.rewind();
+      newBuffer.put(inputBuffer);
+      inputBuffer = newBuffer;      
+    } else {
+      inputBuffer.limit(inputBuffer.position() + len);
+    }
+    inputBuffer.put(buffer, off, len);
+  }
+
+  @Override
+  public void end() {
+    // No-op		
+  }
+
+  @Override
+  public synchronized boolean finished() {
+    return finished && !outputBuffer.hasRemaining();
+  }
+
+  @Override
+  public int getRemaining() {
+    return 0;
+  }
+
+  @Override
+  public synchronized boolean needsInput() {
+    return !inputBuffer.hasRemaining() && !outputBuffer.hasRemaining();
+  }
+
+  @Override
+  public synchronized void reset() {
+    finished = false;
+    inputBuffer.rewind();
+    outputBuffer.rewind();
+    inputBuffer.limit(0);
+    outputBuffer.limit(0);
+  }
+
+  @Override
+  public boolean needsDictionary() {
+    return false;
+  }
+
+  @Override
+  public void setDictionary(byte[] b, int off, int len) {
+    // No-op		
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyUtil.java
new file mode 100644
index 0000000..41e2c4d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyUtil.java
@@ -0,0 +1,33 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.codec;
+
+import org.apache.parquet.Preconditions;
+
+/**
+ * Utilities for SnappyCompressor and SnappyDecompressor.
+ */
+public class SnappyUtil {
+  public static void validateBuffer(byte[] buffer, int off, int len) {
+    Preconditions.checkNotNull(buffer, "buffer");
+    Preconditions.checkArgument(off >= 0 && len >= 0 && off <= buffer.length - len,
+        "Invalid buffer offset or length: buffer.length=%s off=%s len=%s",
+        buffer.length, off, len);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleInputFormat.java
new file mode 100644
index 0000000..791cb04
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleInputFormat.java
@@ -0,0 +1,38 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.example;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+
+/**
+ * Example input format to read Parquet files
+ *
+ * This Input format uses a rather inefficient data model but works independently of higher level abstractions.
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ExampleInputFormat extends ParquetInputFormat<Group> {
+
+  public ExampleInputFormat() {
+    super(GroupReadSupport.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleOutputFormat.java
new file mode 100644
index 0000000..08184da
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleOutputFormat.java
@@ -0,0 +1,62 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.example;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * An example output format
+ *
+ * must be provided the schema up front
+ * @see ExampleOutputFormat#setSchema(Configuration, MessageType)
+ * @see GroupWriteSupport#PARQUET_EXAMPLE_SCHEMA
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ExampleOutputFormat extends ParquetOutputFormat<Group> {
+
+  /**
+   * set the schema being written to the job conf
+   * @param schema the schema of the data
+   * @param configuration the job configuration
+   */
+  public static void setSchema(Job job, MessageType schema) {
+    GroupWriteSupport.setSchema(schema, ContextUtil.getConfiguration(job));
+  }
+
+  /**
+   * retrieve the schema from the conf
+   * @param configuration the job conf
+   * @return the schema
+   */
+  public static MessageType getSchema(Job job) {
+    return GroupWriteSupport.getSchema(ContextUtil.getConfiguration(job));
+  }
+
+  public ExampleOutputFormat() {
+    super(new GroupWriteSupport());
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
new file mode 100644
index 0000000..c49b681
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupReadSupport.java
@@ -0,0 +1,49 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.example;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+public class GroupReadSupport extends ReadSupport<Group> {
+
+  @Override
+  public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(
+      Configuration configuration, Map<String, String> keyValueMetaData,
+      MessageType fileSchema) {
+    String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA);
+    MessageType requestedProjection = getSchemaForRead(fileSchema, partialSchemaString);
+    return new ReadContext(requestedProjection);
+  }
+
+  @Override
+  public RecordMaterializer<Group> prepareForRead(Configuration configuration,
+      Map<String, String> keyValueMetaData, MessageType fileSchema,
+      org.apache.parquet.hadoop.api.ReadSupport.ReadContext readContext) {
+    return new GroupRecordConverter(readContext.getRequestedSchema());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
new file mode 100644
index 0000000..0eb0cb8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java
@@ -0,0 +1,67 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.example;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+public class GroupWriteSupport extends WriteSupport<Group> {
+
+  public static final String PARQUET_EXAMPLE_SCHEMA = "parquet.example.schema";
+
+  public static void setSchema(MessageType schema, Configuration configuration) {
+    configuration.set(PARQUET_EXAMPLE_SCHEMA, schema.toString());
+  }
+
+  public static MessageType getSchema(Configuration configuration) {
+    return parseMessageType(checkNotNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA));
+  }
+
+  private MessageType schema;
+  private GroupWriter groupWriter;
+
+  @Override
+  public org.apache.parquet.hadoop.api.WriteSupport.WriteContext init(Configuration configuration) {
+    schema = getSchema(configuration);
+    return new WriteContext(schema, new HashMap<String, String>());
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    groupWriter = new GroupWriter(recordConsumer, schema);
+  }
+
+  @Override
+  public void write(Group record) {
+    groupWriter.write(record);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/Container.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/Container.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/Container.java
new file mode 100644
index 0000000..ecdf685
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/Container.java
@@ -0,0 +1,37 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.mapred;
+
+/**
+ * A simple container of <T> objects that you can get and set.
+ * @param <T>
+ */
+public class Container<T> {
+
+  T object;
+
+  public void set(T object) {
+    this.object = object;
+  }
+
+  public T get() {
+    return object;
+  }
+
+}


Mime
View raw message