parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [11/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:08 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
new file mode 100644
index 0000000..6b69bc9
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -0,0 +1,208 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.mapred;
+
+import static java.lang.Boolean.TRUE;
+import static java.util.Arrays.asList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.ParquetRecordReader;
+
+public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.FileInputFormat<Void, Container<V>> {
+
+  protected ParquetInputFormat<V> realInputFormat = new ParquetInputFormat<V>();
+
+  @Override
+  public RecordReader<Void, Container<V>> getRecordReader(InputSplit split, JobConf job,
+                  Reporter reporter) throws IOException {
+    return new RecordReaderWrapper<V>(split, job, reporter);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    if (isTaskSideMetaData(job)) {
+      return super.getSplits(job, numSplits);
+    }
+
+    List<Footer> footers = getFooters(job);
+    List<ParquetInputSplit> splits = realInputFormat.getSplits(job, footers);
+    if (splits == null) {
+      return null;
+    }
+    InputSplit[] resultSplits = new InputSplit[splits.size()];
+    int i = 0;
+    for (ParquetInputSplit split : splits) {
+      resultSplits[i++] = new ParquetInputSplitWrapper(split);
+    }
+    return resultSplits;
+  }
+
+  public List<Footer> getFooters(JobConf job) throws IOException {
+    return realInputFormat.getFooters(job, asList(super.listStatus(job)));
+  }
+
+  private static class RecordReaderWrapper<V> implements RecordReader<Void, Container<V>> {
+
+    private ParquetRecordReader<V> realReader;
+    private long splitLen; // for getPos()
+
+    private Container<V> valueContainer = null;
+
+    private boolean firstRecord = false;
+    private boolean eof = false;
+
+    public RecordReaderWrapper(
+        InputSplit oldSplit, JobConf oldJobConf, Reporter reporter)
+        throws IOException {
+      splitLen = oldSplit.getLength();
+
+      try {
+        realReader = new ParquetRecordReader<V>(
+            ParquetInputFormat.<V>getReadSupportInstance(oldJobConf),
+            ParquetInputFormat.getFilter(oldJobConf));
+
+        if (oldSplit instanceof ParquetInputSplitWrapper) {
+          realReader.initialize(((ParquetInputSplitWrapper) oldSplit).realSplit, oldJobConf, reporter);
+        } else if (oldSplit instanceof FileSplit) {
+          realReader.initialize((FileSplit) oldSplit, oldJobConf, reporter);
+        } else {
+          throw new IllegalArgumentException(
+              "Invalid split (not a FileSplit or ParquetInputSplitWrapper): " + oldSplit);
+        }
+
+        // read once to gain access to key and value objects
+        if (realReader.nextKeyValue()) {
+          firstRecord = true;
+          valueContainer = new Container<V>();
+          valueContainer.set(realReader.getCurrentValue());
+
+        } else {
+          eof = true;
+        }
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      realReader.close();
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<V> createValue() {
+      return valueContainer;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLen * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      try {
+        return realReader.getProgress();
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public boolean next(Void key, Container<V> value) throws IOException {
+      if (eof) {
+        return false;
+      }
+
+      if (firstRecord) { // key & value are already read.
+        firstRecord = false;
+        return true;
+      }
+
+      try {
+        if (realReader.nextKeyValue()) {
+          if (value != null) value.set(realReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+
+      eof = true; // strictly not required, just for consistency
+      return false;
+    }
+  }
+
+  public static boolean isTaskSideMetaData(JobConf job) {
+    return job.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, TRUE);
+  }
+
+  private static class ParquetInputSplitWrapper implements InputSplit {
+
+    ParquetInputSplit realSplit;
+
+    @SuppressWarnings("unused") // MapReduce instantiates this.
+    public ParquetInputSplitWrapper() {}
+
+    public ParquetInputSplitWrapper(ParquetInputSplit realSplit) {
+      this.realSplit = realSplit;
+    }
+
+    @Override
+    public long getLength() throws IOException {
+        return realSplit.getLength();
+    }
+
+    @Override
+    public String[] getLocations() throws IOException {
+        return realSplit.getLocations();
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      realSplit = new ParquetInputSplit();
+      realSplit.readFields(in);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      realSplit.write(out);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
new file mode 100644
index 0000000..33f589b
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetOutputFormat.java
@@ -0,0 +1,119 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.ParquetRecordWriter;
+import org.apache.parquet.hadoop.codec.CodecConfig;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+public class DeprecatedParquetOutputFormat<V> extends org.apache.hadoop.mapred.FileOutputFormat<Void, V> {
+
+  public static void setWriteSupportClass(Configuration configuration,  Class<?> writeSupportClass) {
+    configuration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS, writeSupportClass.getName());
+  }
+
+  public static void setBlockSize(Configuration configuration, int blockSize) {
+    configuration.setInt(ParquetOutputFormat.BLOCK_SIZE, blockSize);
+  }
+
+  public static void setPageSize(Configuration configuration, int pageSize) {
+    configuration.setInt(ParquetOutputFormat.PAGE_SIZE, pageSize);
+  }
+
+  public static void setCompression(Configuration configuration, CompressionCodecName compression) {
+    configuration.set(ParquetOutputFormat.COMPRESSION, compression.name());
+  }
+
+  public static void setEnableDictionary(Configuration configuration, boolean enableDictionary) {
+    configuration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, enableDictionary);
+  }
+
+  public static void setAsOutputFormat(JobConf jobConf) {
+    jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+    jobConf.setOutputCommitter(MapredParquetOutputCommitter.class);
+  }
+
+  private CompressionCodecName getCodec(final JobConf conf) {
+    return CodecConfig.from(conf).getCodec();
+  }
+
+  private static Path getDefaultWorkFile(JobConf conf, String name, String extension) {
+    String file = getUniqueName(conf, name) + extension;
+    return new Path(getWorkOutputPath(conf), file);
+  }
+
+  protected ParquetOutputFormat<V> realOutputFormat = new ParquetOutputFormat<V>();
+
+  @Override
+  public RecordWriter<Void, V> getRecordWriter(FileSystem fs,
+      JobConf conf, String name, Progressable progress) throws IOException {
+    return new RecordWriterWrapper(realOutputFormat, fs, conf, name, progress);
+  }
+
+  private class RecordWriterWrapper implements RecordWriter<Void, V> {
+
+    private ParquetRecordWriter<V> realWriter;
+
+    public RecordWriterWrapper(ParquetOutputFormat<V> realOutputFormat,
+        FileSystem fs, JobConf conf, String name, Progressable progress) throws IOException {
+
+      CompressionCodecName codec = getCodec(conf);
+      String extension = codec.getExtension() + ".parquet";
+      Path file = getDefaultWorkFile(conf, name, extension);
+
+      try {
+        realWriter = (ParquetRecordWriter<V>) realOutputFormat.getRecordWriter(conf, file, codec);
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      try {
+        realWriter.close(null);
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void write(Void key, V value) throws IOException {
+      try {
+        realWriter.write(key, value);
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        throw new IOException(e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
new file mode 100644
index 0000000..0504db8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/MapredParquetOutputCommitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.*;
+import org.apache.parquet.hadoop.ParquetOutputCommitter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+import java.io.IOException;
+
+/**
+ *
+ * Adapter for supporting ParquetOutputCommitter in mapred API
+ *
+ * @author Tianshuo Deng
+ */
+public class MapredParquetOutputCommitter extends FileOutputCommitter {
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    super.commitJob(jobContext);
+    Configuration conf = ContextUtil.getConfiguration(jobContext);
+    Path outputPath = FileOutputFormat.getOutputPath(new JobConf(conf));
+    ParquetOutputCommitter.writeMetaDataFile(conf, outputPath);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
new file mode 100644
index 0000000..13e6fa8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java
@@ -0,0 +1,123 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Block metadata stored in the footer and passed in an InputSplit
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BlockMetaData {
+
+  private List<ColumnChunkMetaData> columns = new ArrayList<ColumnChunkMetaData>();
+  private long rowCount;
+  private long totalByteSize;
+  private String path;
+
+  public BlockMetaData() {
+  }
+
+
+  /**
+   * @param path the path to the file containing the data. Or null if same file the metadata was found
+   */
+  public void setPath(String path) {
+    this.path = path;
+  }
+
+  /**
+   * @return the path relative to the parent of this file where the data is. Or null if it is in the same file.
+   */
+  public String getPath() {
+    return path;
+  }
+
+  /**
+   * @return the rowCount
+   */
+  public long getRowCount() {
+    return rowCount;
+  }
+
+  /**
+   * @param rowCount the rowCount to set
+   */
+  public void setRowCount(long rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  /**
+   * @return the totalByteSize
+   */
+  public long getTotalByteSize() {
+    return totalByteSize;
+  }
+
+  /**
+   * @param totalByteSize the totalByteSize to set
+   */
+  public void setTotalByteSize(long totalByteSize) {
+    this.totalByteSize = totalByteSize;
+  }
+
+  /**
+   *
+   * @param column the metadata for a column
+   */
+  public void addColumn(ColumnChunkMetaData column) {
+    columns.add(column);
+  }
+
+  /**
+   *
+   * @return the metadata for columns
+   */
+  public List<ColumnChunkMetaData> getColumns() {
+    return Collections.unmodifiableList(columns);
+  }
+
+  /**
+   *
+   * @return the starting pos of first column
+   */
+  public long getStartingPos() {
+    return getColumns().get(0).getStartingPos();
+  }
+  @Override
+  public String toString() {
+    return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
+  }
+
+  /**
+   * @return the compressed size of all columns
+   */
+  public long getCompressedSize() {
+    long totalSize = 0;
+    for (ColumnChunkMetaData col : getColumns()) {
+      totalSize += col.getTotalSize();
+    }
+    return totalSize;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
new file mode 100644
index 0000000..0c2fd4d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -0,0 +1,389 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.Set;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.BooleanStatistics;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Column meta data for a block stored in the file footer and passed in the InputSplit
+ * @author Julien Le Dem
+ */
+abstract public class ColumnChunkMetaData {
+
+  @Deprecated
+  public static ColumnChunkMetaData get(
+      ColumnPath path,
+      PrimitiveTypeName type,
+      CompressionCodecName codec,
+      Set<Encoding> encodings,
+      long firstDataPage,
+      long dictionaryPageOffset,
+      long valueCount,
+      long totalSize,
+      long totalUncompressedSize) {
+    // to save space we store those always positive longs in ints when they fit.
+    if (positiveLongFitsInAnInt(firstDataPage)
+        && positiveLongFitsInAnInt(dictionaryPageOffset)
+        && positiveLongFitsInAnInt(valueCount)
+        && positiveLongFitsInAnInt(totalSize)
+        && positiveLongFitsInAnInt(totalUncompressedSize)) {
+      return new IntColumnChunkMetaData(
+          path, type, codec, encodings,
+          new BooleanStatistics(),
+          firstDataPage,
+          dictionaryPageOffset,
+          valueCount,
+          totalSize,
+          totalUncompressedSize);
+    } else {
+      return new LongColumnChunkMetaData(
+          path, type, codec, encodings,
+          new BooleanStatistics(),
+          firstDataPage,
+          dictionaryPageOffset,
+          valueCount,
+          totalSize,
+          totalUncompressedSize);
+    }
+  }
+
+
+  public static ColumnChunkMetaData get(
+      ColumnPath path,
+      PrimitiveTypeName type,
+      CompressionCodecName codec,
+      Set<Encoding> encodings,
+      Statistics statistics,
+      long firstDataPage,
+      long dictionaryPageOffset,
+      long valueCount,
+      long totalSize,
+      long totalUncompressedSize) {
+    // to save space we store those always positive longs in ints when they fit.
+    if (positiveLongFitsInAnInt(firstDataPage)
+        && positiveLongFitsInAnInt(dictionaryPageOffset)
+        && positiveLongFitsInAnInt(valueCount)
+        && positiveLongFitsInAnInt(totalSize)
+        && positiveLongFitsInAnInt(totalUncompressedSize)) {
+      return new IntColumnChunkMetaData(
+          path, type, codec, encodings,
+          statistics,
+          firstDataPage,
+          dictionaryPageOffset,
+          valueCount,
+          totalSize,
+          totalUncompressedSize);
+    } else {
+      return new LongColumnChunkMetaData(
+          path, type, codec, encodings,
+          statistics,
+          firstDataPage,
+          dictionaryPageOffset,
+          valueCount,
+          totalSize,
+          totalUncompressedSize);
+    }
+  }
+
+  /**
+   * @return the offset of the first byte in the chunk
+   */
+  public long getStartingPos() {
+    long dictionaryPageOffset = getDictionaryPageOffset();
+    long firstDataPageOffset = getFirstDataPageOffset();
+    if (dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) {
+      // if there's a dictionary and it's before the first data page, start from there
+      return dictionaryPageOffset;
+    }
+    return firstDataPageOffset;
+  }
+
+  /**
+   * checks that a positive long value fits in an int.
+   * (reindexed on Integer.MIN_VALUE)
+   * @param value
+   * @return whether it fits
+   */
+  protected static boolean positiveLongFitsInAnInt(long value) {
+    return (value >= 0) && (value + Integer.MIN_VALUE <= Integer.MAX_VALUE);
+  }
+
+  // we save 3 references by storing together the column properties that have few distinct values
+  private final ColumnChunkProperties properties;
+
+  protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
+    this.properties = columnChunkProperties;
+  }
+
+  public CompressionCodecName getCodec() {
+    return properties.getCodec();
+  }
+
+  /**
+   *
+   * @return column identifier
+   */
+  public ColumnPath getPath() {
+    return properties.getPath();
+  }
+
+  /**
+   * @return type of the column
+   */
+  public PrimitiveTypeName getType() {
+    return properties.getType();
+  }
+
+  /**
+   * @return start of the column data offset
+   */
+  abstract public long getFirstDataPageOffset();
+
+  /**
+   * @return the location of the dictionary page if any
+   */
+  abstract public long getDictionaryPageOffset();
+
+  /**
+   * @return count of values in this block of the column
+   */
+  abstract public long getValueCount();
+
+  /**
+   * @return the totalUncompressedSize
+   */
+  abstract public long getTotalUncompressedSize();
+
+  /**
+   * @return the totalSize
+   */
+  abstract public long getTotalSize();
+
+  /**
+   * @return the stats for this column
+   */
+  abstract public Statistics getStatistics();
+
+  /**
+   * @return all the encodings used in this column
+   */
+  public Set<Encoding> getEncodings() {
+    return properties.getEncodings();
+  }
+
+
+  @Override
+  public String toString() {
+    return "ColumnMetaData{" + properties.toString() + ", " + getFirstDataPageOffset() + "}";
+  }
+}
+
+class IntColumnChunkMetaData extends ColumnChunkMetaData {
+
+  private final int firstDataPage;
+  private final int dictionaryPageOffset;
+  private final int valueCount;
+  private final int totalSize;
+  private final int totalUncompressedSize;
+  private final Statistics statistics;
+
+  /**
+   * @param path column identifier
+   * @param type type of the column
+   * @param codec
+   * @param encodings
+   * @param statistics
+   * @param firstDataPage
+   * @param dictionaryPageOffset
+   * @param valueCount
+   * @param totalSize
+   * @param totalUncompressedSize
+   */
+  IntColumnChunkMetaData(
+      ColumnPath path,
+      PrimitiveTypeName type,
+      CompressionCodecName codec,
+      Set<Encoding> encodings,
+      Statistics statistics,
+      long firstDataPage,
+      long dictionaryPageOffset,
+      long valueCount,
+      long totalSize,
+      long totalUncompressedSize) {
+    super(ColumnChunkProperties.get(path, type, codec, encodings));
+    this.firstDataPage = positiveLongToInt(firstDataPage);
+    this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset);
+    this.valueCount = positiveLongToInt(valueCount);
+    this.totalSize = positiveLongToInt(totalSize);
+    this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize);
+    this.statistics = statistics;
+  }
+
+  /**
+   * stores a positive long into an int (assuming it fits)
+   * @param value
+   * @return
+   */
+  private int positiveLongToInt(long value) {
+    if (!ColumnChunkMetaData.positiveLongFitsInAnInt(value)) {
+      throw new IllegalArgumentException("value should be positive and fit in an int: " + value);
+    }
+    return (int)(value + Integer.MIN_VALUE);
+  }
+
+  /**
+   * turns the int back into a positive long
+   * @param value
+   * @return
+   */
+  private long intToPositiveLong(int value) {
+    return (long)value - Integer.MIN_VALUE;
+  }
+
+  /**
+   * @return start of the column data offset
+   */
+  public long getFirstDataPageOffset() {
+    return intToPositiveLong(firstDataPage);
+  }
+
+  /**
+   * @return the location of the dictionary page if any
+   */
+  public long getDictionaryPageOffset() {
+    return intToPositiveLong(dictionaryPageOffset);
+  }
+
+  /**
+   * @return count of values in this block of the column
+   */
+  public long getValueCount() {
+    return intToPositiveLong(valueCount);
+  }
+
+  /**
+   * @return the totalUncompressedSize
+   */
+  public long getTotalUncompressedSize() {
+    return intToPositiveLong(totalUncompressedSize);
+  }
+
+  /**
+   * @return the totalSize
+   */
+  public long getTotalSize() {
+    return intToPositiveLong(totalSize);
+  }
+
+  /**
+   * @return the stats for this column
+   */
+  public Statistics getStatistics() {
+   return statistics;
+  }
+}
+class LongColumnChunkMetaData extends ColumnChunkMetaData {
+
+  private final long firstDataPageOffset;
+  private final long dictionaryPageOffset;
+  private final long valueCount;
+  private final long totalSize;
+  private final long totalUncompressedSize;
+  private final Statistics statistics;
+
+  /**
+   * @param path column identifier
+   * @param type type of the column
+   * @param codec
+   * @param encodings
+   * @param statistics
+   * @param firstDataPageOffset
+   * @param dictionaryPageOffset
+   * @param valueCount
+   * @param totalSize
+   * @param totalUncompressedSize
+   */
+  LongColumnChunkMetaData(
+      ColumnPath path,
+      PrimitiveTypeName type,
+      CompressionCodecName codec,
+      Set<Encoding> encodings,
+      Statistics statistics,
+      long firstDataPageOffset,
+      long dictionaryPageOffset,
+      long valueCount,
+      long totalSize,
+      long totalUncompressedSize) {
+    super(ColumnChunkProperties.get(path, type, codec, encodings));
+    this.firstDataPageOffset = firstDataPageOffset;
+    this.dictionaryPageOffset = dictionaryPageOffset;
+    this.valueCount = valueCount;
+    this.totalSize = totalSize;
+    this.totalUncompressedSize = totalUncompressedSize;
+    this.statistics = statistics;
+  }
+
+  /**
+   * @return start of the column data offset
+   */
+  public long getFirstDataPageOffset() {
+    return firstDataPageOffset;
+  }
+
+  /**
+   * @return the location of the dictionary page if any
+   */
+  public long getDictionaryPageOffset() {
+    return dictionaryPageOffset;
+  }
+
+  /**
+   * @return count of values in this block of the column
+   */
+  public long getValueCount() {
+    return valueCount;
+  }
+
+  /**
+   * @return the totalUncompressedSize
+   */
+  public long getTotalUncompressedSize() {
+    return totalUncompressedSize;
+  }
+
+  /**
+   * @return the totalSize
+   */
+  public long getTotalSize() {
+    return totalSize;
+  }
+
+  /**
+   * @return the stats for this column
+   */
+  public Statistics getStatistics() {
+   return statistics;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
new file mode 100644
index 0000000..5e26675
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkProperties.java
@@ -0,0 +1,89 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+public class ColumnChunkProperties {
+
+  private static Canonicalizer<ColumnChunkProperties> properties = new Canonicalizer<ColumnChunkProperties>();
+
+  public static ColumnChunkProperties get(ColumnPath path, PrimitiveTypeName type, CompressionCodecName codec, Set<Encoding> encodings) {
+    return properties.canonicalize(new ColumnChunkProperties(codec, path, type, encodings));
+  }
+
+  private final CompressionCodecName codec;
+  private final ColumnPath path;
+  private final PrimitiveTypeName type;
+  private final Set<Encoding> encodings;
+
+  private ColumnChunkProperties(CompressionCodecName codec,
+                                ColumnPath path,
+                                PrimitiveTypeName type,
+                                Set<Encoding> encodings) {
+    super();
+    this.codec = codec;
+    this.path = path;
+    this.type = type;
+    this.encodings = encodings;
+  }
+
+  public CompressionCodecName getCodec() {
+    return codec;
+  }
+
+  public ColumnPath getPath() {
+    return path;
+  }
+
+  public PrimitiveTypeName getType() {
+    return type;
+  }
+
+  public Set<Encoding> getEncodings() {
+    return encodings;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ColumnChunkProperties) {
+      ColumnChunkProperties other = (ColumnChunkProperties)obj;
+      return other.codec == codec && other.path.equals(path) && other.type == type && equals(other.encodings, encodings);
+    }
+    return false;
+  }
+
+  private boolean equals(Set<Encoding> a, Set<Encoding> b) {
+    return a.size() == b.size() && a.containsAll(b);
+  }
+
+  @Override
+  public int hashCode() {
+    return codec.hashCode() ^ path.hashCode() ^ type.hashCode() ^ Arrays.hashCode(encodings.toArray());
+  }
+
+  @Override
+  public String toString() {
+    return codec + " " + path + " " + type + "  " + encodings;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
new file mode 100644
index 0000000..558bea7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/CompressionCodecName.java
@@ -0,0 +1,93 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.hadoop.codec.CompressionCodecNotSupportedException;
+
+public enum CompressionCodecName {
+  UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
+  SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"),
+  GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
+  LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo");
+
+  public static CompressionCodecName fromConf(String name) {
+     if (name == null) {
+       return UNCOMPRESSED;
+     }
+     return valueOf(name.toUpperCase());
+  }
+
+  public static CompressionCodecName fromCompressionCodec(Class<?> clazz) {
+    if (clazz == null) {
+      return UNCOMPRESSED;
+    }
+    String name = clazz.getName();
+    for (CompressionCodecName codec : CompressionCodecName.values()) {
+      if (name.equals(codec.getHadoopCompressionCodecClassName())) {
+        return codec;
+      }
+    }
+    throw new CompressionCodecNotSupportedException(clazz);
+  }
+
+  public static CompressionCodecName fromParquet(CompressionCodec codec) {
+    for (CompressionCodecName codecName : CompressionCodecName.values()) {
+      if (codec.equals(codecName.parquetCompressionCodec)) {
+        return codecName;
+      }
+    }
+    throw new IllegalArgumentException("Unknown compression codec " + codec);
+  }
+
+  private final String hadoopCompressionCodecClass;
+  private final CompressionCodec parquetCompressionCodec;
+  private final String extension;
+
+  private CompressionCodecName(String hadoopCompressionCodecClass, CompressionCodec parquetCompressionCodec, String extension) {
+    this.hadoopCompressionCodecClass = hadoopCompressionCodecClass;
+    this.parquetCompressionCodec = parquetCompressionCodec;
+    this.extension = extension;
+  }
+
+  public String getHadoopCompressionCodecClassName() {
+    return hadoopCompressionCodecClass;
+  }
+
+  public Class getHadoopCompressionCodecClass() {
+    String codecClassName = getHadoopCompressionCodecClassName();
+    if (codecClassName==null) {
+      return null;
+    }
+    try {
+      return Class.forName(codecClassName);
+    } catch (ClassNotFoundException e) {
+      return null;
+    }
+  }
+
+  public CompressionCodec getParquetCompressionCodec() {
+    return parquetCompressionCodec;
+  }
+
+  public String getExtension() {
+    return extension;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java
new file mode 100644
index 0000000..ef073c6
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/EncodingList.java
@@ -0,0 +1,81 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.parquet.column.Encoding;
+
+public class EncodingList implements Iterable<Encoding> {
+
+  private static Canonicalizer<EncodingList> encodingLists = new Canonicalizer<EncodingList>();
+
+  public static EncodingList getEncodingList(List<Encoding> encodings) {
+    return encodingLists.canonicalize(new EncodingList(encodings));
+  }
+
+  private final List<Encoding> encodings;
+
+  private EncodingList(List<Encoding> encodings) {
+    super();
+    this.encodings = Collections.unmodifiableList(encodings);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof EncodingList) {
+      List<org.apache.parquet.column.Encoding> other = ((EncodingList)obj).encodings;
+      final int size = other.size();
+      if (size != encodings.size()) {
+        return false;
+      }
+      for (int i = 0; i < size; i++) {
+        if (!other.get(i).equals(encodings.get(i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = 1;
+    for (org.apache.parquet.column.Encoding element : encodings)
+      result = 31 * result + (element == null ? 0 : element.hashCode());
+    return result;
+  }
+
+  public List<Encoding> toList() {
+    return encodings;
+  }
+
+  @Override
+  public Iterator<Encoding> iterator() {
+    return encodings.iterator();
+  }
+
+  public int size() {
+    return encodings.size();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
new file mode 100644
index 0000000..6135d58
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/FileMetaData.java
@@ -0,0 +1,83 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import static java.util.Collections.unmodifiableMap;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.parquet.schema.MessageType;
+
+
+/**
+ * File level meta data (Schema, codec, ...)
+ *
+ * @author Julien Le Dem
+ *
+ */
+public final class FileMetaData implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageType schema;
+
+  private final Map<String, String> keyValueMetaData;
+
+  private final String createdBy;
+
+  /**
+   * @param schema the schema for the file
+   * @param keyValueMetaData the app specific metadata
+   * @param createdBy the description of the library that created the file
+   */
+  public FileMetaData(MessageType schema, Map<String, String> keyValueMetaData, String createdBy) {
+    super();
+    this.schema = checkNotNull(schema, "schema");
+    this.keyValueMetaData = unmodifiableMap(checkNotNull(keyValueMetaData, "keyValueMetaData"));
+    this.createdBy = createdBy;
+  }
+
+  /**
+   * @return the schema for the file
+   */
+  public MessageType getSchema() {
+    return schema;
+  }
+
+  @Override
+  public String toString() {
+    return "FileMetaData{schema: "+schema+ ", metadata: " + keyValueMetaData + "}";
+  }
+
+  /**
+   * @return meta data for extensions
+   */
+  public Map<String, String> getKeyValueMetaData() {
+    return keyValueMetaData;
+  }
+
+  /**
+   * @return the description of the library that created the file
+   */
+  public String getCreatedBy() {
+    return createdBy;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
new file mode 100644
index 0000000..677ef03
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/GlobalMetaData.java
@@ -0,0 +1,106 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import static java.util.Collections.unmodifiableMap;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Merged metadata when reading from multiple files.
+ * THis is to allow schema evolution
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class GlobalMetaData implements Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private final MessageType schema;
+
+  private final Map<String, Set<String>> keyValueMetaData;
+
+  private final Set<String> createdBy;
+
+  /**
+   * @param schema the union of the schemas for all the files
+   * @param keyValueMetaData the merged app specific metadata
+   * @param createdBy the description of the library that created the file
+   */
+  public GlobalMetaData(MessageType schema, Map<String, Set<String>> keyValueMetaData, Set<String> createdBy) {
+    super();
+    this.schema = checkNotNull(schema, "schema");
+    this.keyValueMetaData = unmodifiableMap(checkNotNull(keyValueMetaData, "keyValueMetaData"));
+    this.createdBy = createdBy;
+  }
+
+  /**
+   * @return the schema for the file
+   */
+  public MessageType getSchema() {
+    return schema;
+  }
+
+  @Override
+  public String toString() {
+    return "GlobalMetaData{schema: "+schema+ ", metadata: " + keyValueMetaData + "}";
+  }
+
+  /**
+   * @return meta data for extensions
+   */
+  public Map<String, Set<String>> getKeyValueMetaData() {
+    return keyValueMetaData;
+  }
+
+  /**
+   * @return the description of the library that created the file
+   */
+  public Set<String> getCreatedBy() {
+    return createdBy;
+  }
+
+  /**
+   * Will merge the metadata as if it was coming from a single file.
+   * (for all part files written together this will always work)
+   * If there are conflicting values an exception will be thrown
+   * @return the merged version of this
+   */
+  public FileMetaData merge() {
+    String createdByString = createdBy.size() == 1 ?
+      createdBy.iterator().next() :
+      createdBy.toString();
+    Map<String, String> mergedKeyValues = new HashMap<String, String>();
+    for (Entry<String, Set<String>> entry : keyValueMetaData.entrySet()) {
+      if (entry.getValue().size() > 1) {
+        throw new RuntimeException("could not merge metadata: key " + entry.getKey() + " has conflicting values: " + entry.getValue());
+      }
+      mergedKeyValues.put(entry.getKey(), entry.getValue().iterator().next());
+    }
+    return new FileMetaData(schema, mergedKeyValues, createdByString);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
new file mode 100644
index 0000000..d35582a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
@@ -0,0 +1,132 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.metadata;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.util.List;
+
+import org.codehaus.jackson.JsonGenerationException;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig.Feature;
+
+/**
+ * Meta Data block stored in the footer of the file
+ * contains file level (Codec, Schema, ...) and block level (location, columns, record count, ...) meta data
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetMetadata {
+
+  private static ObjectMapper objectMapper = new ObjectMapper();
+  private static ObjectMapper prettyObjectMapper = new ObjectMapper();
+  static {
+    prettyObjectMapper.configure(Feature.INDENT_OUTPUT, true);
+  }
+
+  /**
+   *
+   * @param parquetMetaData
+   * @return the json representation
+   */
+  public static String toJSON(ParquetMetadata parquetMetaData) {
+    return toJSON(parquetMetaData, objectMapper);
+  }
+
+  /**
+   *
+   * @param parquetMetaData
+   * @return the pretty printed json representation
+   */
+  public static String toPrettyJSON(ParquetMetadata parquetMetaData) {
+    return toJSON(parquetMetaData, prettyObjectMapper);
+  }
+
+  private static String toJSON(ParquetMetadata parquetMetaData, ObjectMapper mapper) {
+    StringWriter stringWriter = new StringWriter();
+    try {
+      mapper.writeValue(stringWriter, parquetMetaData);
+    } catch (JsonGenerationException e) {
+      throw new RuntimeException(e);
+    } catch (JsonMappingException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return stringWriter.toString();
+  }
+
+  /**
+   *
+   * @param json the json representation
+   * @return the parsed object
+   */
+  public static ParquetMetadata fromJSON(String json) {
+    try {
+      return objectMapper.readValue(new StringReader(json), ParquetMetadata.class);
+    } catch (JsonParseException e) {
+      throw new RuntimeException(e);
+    } catch (JsonMappingException e) {
+      throw new RuntimeException(e);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private final FileMetaData fileMetaData;
+  private final List<BlockMetaData> blocks;
+
+  /**
+   *
+   * @param fileMetaData file level metadata
+   * @param blocks block level metadata
+   * @param keyValueMetaData
+   */
+  public ParquetMetadata(FileMetaData fileMetaData, List<BlockMetaData> blocks) {
+    this.fileMetaData = fileMetaData;
+    this.blocks = blocks;
+  }
+
+  /**
+   *
+   * @return block level metadata
+   */
+  public List<BlockMetaData> getBlocks() {
+    return blocks;
+  }
+
+  /**
+   *
+   * @return file level meta data
+   */
+  public FileMetaData getFileMetaData() {
+    return fileMetaData;
+  }
+
+
+  @Override
+  public String toString() {
+    return "ParquetMetaData{"+fileMetaData+", blocks: "+blocks+"}";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java
new file mode 100644
index 0000000..d319812
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/package-info.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *
+ * <p>
+ * Provides classes to store use Parquet files in Hadoop
+ *
+ * In a map reduce job:
+ * @see org.apache.parquet.hadoop.ParquetInputFormat
+ * @see org.apache.parquet.hadoop.ParquetOutputFormat
+ *
+ * In a standalone java app:
+ * @see org.apache.parquet.hadoop.ParquetWriter
+ * @see org.apache.parquet.hadoop.ParquetReader
+ *
+ * </p>
+ */
+package org.apache.parquet.hadoop;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
new file mode 100644
index 0000000..7f39cd7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ConfigurationUtil.java
@@ -0,0 +1,44 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.BadConfigurationException;
+
+public class ConfigurationUtil {
+
+  public static Class<?> getClassFromConfig(Configuration configuration, String configName, Class<?> assignableFrom) {
+    final String className = configuration.get(configName);
+    if (className == null) {
+      return null;
+    }
+    
+    try {
+      final Class<?> foundClass = configuration.getClassByName(className);	
+      if (!assignableFrom.isAssignableFrom(foundClass)) {
+        throw new BadConfigurationException("class " + className + " set in job conf at "
+                + configName + " is not a subclass of " + assignableFrom.getCanonicalName());
+      }
+      return foundClass;
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("could not instantiate class " + className + " set in job conf at " + configName, e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
new file mode 100644
index 0000000..106fb0c
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ContextUtil.java
@@ -0,0 +1,275 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MapContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+/*
+ * This is based on ContextFactory.java from hadoop-2.0.x sources.
+ */
+
+/**
+ * Utility methods to allow applications to deal with inconsistencies between
+ * MapReduce Context Objects API between hadoop-0.20 and later versions.
+ */
+public class ContextUtil {
+
+  private static final boolean useV21;
+
+  private static final Constructor<?> JOB_CONTEXT_CONSTRUCTOR;
+  private static final Constructor<?> TASK_CONTEXT_CONSTRUCTOR;
+  private static final Constructor<?> MAP_CONTEXT_CONSTRUCTOR;
+  private static final Constructor<?> MAP_CONTEXT_IMPL_CONSTRUCTOR;
+  private static final Constructor<?> GENERIC_COUNTER_CONSTRUCTOR;
+
+  private static final Field READER_FIELD;
+  private static final Field WRITER_FIELD;
+  private static final Field OUTER_MAP_FIELD;
+  private static final Field WRAPPED_CONTEXT_FIELD;
+
+  private static final Method GET_CONFIGURATION_METHOD;
+  private static final Method GET_COUNTER_METHOD;
+  private static final Method INCREMENT_COUNTER_METHOD;
+
+  static {
+    boolean v21 = true;
+    final String PACKAGE = "org.apache.hadoop.mapreduce";
+    try {
+      Class.forName(PACKAGE + ".task.JobContextImpl");
+    } catch (ClassNotFoundException cnfe) {
+      v21 = false;
+    }
+    useV21 = v21;
+    Class<?> jobContextCls;
+    Class<?> taskContextCls;
+    Class<?> taskIOContextCls;
+    Class<?> mapCls;
+    Class<?> mapContextCls;
+    Class<?> innerMapContextCls;
+    Class<?> genericCounterCls;
+    try {
+      if (v21) {
+        jobContextCls =
+            Class.forName(PACKAGE+".task.JobContextImpl");
+        taskContextCls =
+            Class.forName(PACKAGE+".task.TaskAttemptContextImpl");
+        taskIOContextCls =
+            Class.forName(PACKAGE+".task.TaskInputOutputContextImpl");
+        mapContextCls = Class.forName(PACKAGE + ".task.MapContextImpl");
+        mapCls = Class.forName(PACKAGE + ".lib.map.WrappedMapper");
+        innerMapContextCls =
+            Class.forName(PACKAGE+".lib.map.WrappedMapper$Context");
+        genericCounterCls = Class.forName(PACKAGE+".counters.GenericCounter");
+      } else {
+        jobContextCls =
+            Class.forName(PACKAGE+".JobContext");
+        taskContextCls =
+            Class.forName(PACKAGE+".TaskAttemptContext");
+        taskIOContextCls =
+            Class.forName(PACKAGE+".TaskInputOutputContext");
+        mapContextCls = Class.forName(PACKAGE + ".MapContext");
+        mapCls = Class.forName(PACKAGE + ".Mapper");
+        innerMapContextCls =
+            Class.forName(PACKAGE+".Mapper$Context");
+        genericCounterCls =
+            Class.forName("org.apache.hadoop.mapred.Counters$Counter");
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Can't find class", e);
+    }
+    try {
+      JOB_CONTEXT_CONSTRUCTOR =
+          jobContextCls.getConstructor(Configuration.class, JobID.class);
+      JOB_CONTEXT_CONSTRUCTOR.setAccessible(true);
+      TASK_CONTEXT_CONSTRUCTOR =
+          taskContextCls.getConstructor(Configuration.class,
+              TaskAttemptID.class);
+      TASK_CONTEXT_CONSTRUCTOR.setAccessible(true);
+      GENERIC_COUNTER_CONSTRUCTOR =
+          genericCounterCls.getDeclaredConstructor(String.class,
+              String.class,
+              Long.TYPE);
+      GENERIC_COUNTER_CONSTRUCTOR.setAccessible(true);
+
+      if (useV21) {
+        MAP_CONTEXT_CONSTRUCTOR =
+            innerMapContextCls.getConstructor(mapCls,
+                MapContext.class);
+        MAP_CONTEXT_IMPL_CONSTRUCTOR =
+            mapContextCls.getDeclaredConstructor(Configuration.class,
+                TaskAttemptID.class,
+                RecordReader.class,
+                RecordWriter.class,
+                OutputCommitter.class,
+                StatusReporter.class,
+                InputSplit.class);
+        MAP_CONTEXT_IMPL_CONSTRUCTOR.setAccessible(true);
+        WRAPPED_CONTEXT_FIELD =
+            innerMapContextCls.getDeclaredField("mapContext");
+        WRAPPED_CONTEXT_FIELD.setAccessible(true);
+        Method get_counter_method;
+        try {
+          get_counter_method = Class.forName(PACKAGE + ".TaskAttemptContext").getMethod("getCounter", String.class,
+                  String.class);
+        } catch (Exception e) {
+          get_counter_method = Class.forName(PACKAGE + ".TaskInputOutputContext").getMethod("getCounter",
+                  String.class, String.class);
+        }
+        GET_COUNTER_METHOD=get_counter_method;
+      } else {
+        MAP_CONTEXT_CONSTRUCTOR =
+            innerMapContextCls.getConstructor(mapCls,
+                Configuration.class,
+                TaskAttemptID.class,
+                RecordReader.class,
+                RecordWriter.class,
+                OutputCommitter.class,
+                StatusReporter.class,
+                InputSplit.class);
+        MAP_CONTEXT_IMPL_CONSTRUCTOR = null;
+        WRAPPED_CONTEXT_FIELD = null;
+        GET_COUNTER_METHOD=taskIOContextCls.getMethod("getCounter", String.class, String.class);
+      }
+      MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
+      READER_FIELD = mapContextCls.getDeclaredField("reader");
+      READER_FIELD.setAccessible(true);
+      WRITER_FIELD = taskIOContextCls.getDeclaredField("output");
+      WRITER_FIELD.setAccessible(true);
+      OUTER_MAP_FIELD = innerMapContextCls.getDeclaredField("this$0");
+      OUTER_MAP_FIELD.setAccessible(true);
+      GET_CONFIGURATION_METHOD = Class.forName(PACKAGE+".JobContext")
+          .getMethod("getConfiguration");
+      INCREMENT_COUNTER_METHOD = Class.forName(PACKAGE+".Counter")
+              .getMethod("increment", Long.TYPE);
+    } catch (SecurityException e) {
+      throw new IllegalArgumentException("Can't run constructor ", e);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("Can't find constructor ", e);
+    } catch (NoSuchFieldException e) {
+      throw new IllegalArgumentException("Can't find field ", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Can't find class", e);
+    }
+  }
+
+  /**
+   * Creates JobContext from a JobConf and jobId using the correct constructor
+   * for based on Hadoop version. <code>jobId</code> could be null.
+   */
+  public static JobContext newJobContext(Configuration conf, JobID jobId) {
+    try {
+      return (JobContext)
+          JOB_CONTEXT_CONSTRUCTOR.newInstance(conf, jobId);
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't instantiate JobContext", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't instantiate JobContext", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't instantiate JobContext", e);
+    }
+  }
+
+  /**
+   * Creates TaskAttempContext from a JobConf and jobId using the correct
+   * constructor for based on Hadoop version.
+   */
+  public static TaskAttemptContext newTaskAttemptContext(
+      Configuration conf, TaskAttemptID taskAttemptId) {
+    try {
+      return (TaskAttemptContext)
+          TASK_CONTEXT_CONSTRUCTOR.newInstance(conf, taskAttemptId);
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't instantiate TaskAttemptContext", e);
+    }
+  }
+
+  /**
+   * @return with Hadoop 2 : <code>new GenericCounter(args)</code>,<br>
+   *         with Hadoop 1 : <code>new Counter(args)</code>
+   */
+  public static Counter newGenericCounter(String name, String displayName, long value) {
+    try {
+      return (Counter)
+          GENERIC_COUNTER_CONSTRUCTOR.newInstance(name, displayName, value);
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't instantiate Counter", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't instantiate Counter", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't instantiate Counter", e);
+    }
+  }
+
+  /**
+   * Invoke getConfiguration() method on JobContext. Works with both
+   * Hadoop 1 and 2.
+   */
+  public static Configuration getConfiguration(JobContext context) {
+    try {
+      return (Configuration) GET_CONFIGURATION_METHOD.invoke(context);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't invoke method", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't invoke method", e);
+    }
+  }
+
+  public static Counter getCounter(TaskInputOutputContext context,
+                                   String groupName, String counterName) {
+    return (Counter) invoke(GET_COUNTER_METHOD, context, groupName, counterName);
+  }
+
+  /**
+   * Invokes a method and rethrows any exception as runtime exceptions.
+   */
+  private static Object invoke(Method method, Object obj, Object... args) {
+    try {
+      return method.invoke(obj, args);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't invoke method " + method.getName(), e);
+    }
+  }
+
+  public static void incrementCounter(Counter counter, long increment) {
+    invoke(INCREMENT_COUNTER_METHOD, counter, increment);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java
new file mode 100644
index 0000000..1817bb2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HiddenFileFilter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public class HiddenFileFilter implements PathFilter {
+  public static final HiddenFileFilter INSTANCE = new HiddenFileFilter();
+
+  private HiddenFileFilter() {}
+
+  @Override
+  public boolean accept(Path p) {
+    return !p.getName().startsWith("_") && !p.getName().startsWith(".");
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
new file mode 100644
index 0000000..ec413ac
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/SerializationUtil.java
@@ -0,0 +1,111 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.parquet.Closeables;
+import org.apache.parquet.Log;
+
+/**
+ * Serialization utils copied from:
+ * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/util/HadoopUtils.java
+ *
+ * TODO: Refactor elephant-bird so that we can depend on utils like this without extra baggage.
+ */
+public final class SerializationUtil {
+  private static final Log LOG = Log.getLog(SerializationUtil.class);
+
+  private SerializationUtil() { }
+
+  /**
+   * Reads an object (that was written using
+   * {@link #writeObjectToConfAsBase64}) from a configuration.
+   *
+   * @param key for the configuration
+   * @param conf to read from
+   * @return the read object, or null if key is not present in conf
+   * @throws IOException
+   */
+  public static void writeObjectToConfAsBase64(String key, Object obj, Configuration conf) throws IOException {
+    ByteArrayOutputStream baos = null;
+    GZIPOutputStream gos = null;
+    ObjectOutputStream oos = null;
+
+    try {
+      baos = new ByteArrayOutputStream();
+      gos = new GZIPOutputStream(baos);
+      oos = new ObjectOutputStream(gos);
+      oos.writeObject(obj);
+    } finally {
+      Closeables.close(oos);
+      Closeables.close(gos);
+      Closeables.close(baos);
+    }
+
+    conf.set(key, new String(Base64.encodeBase64(baos.toByteArray()), "UTF-8"));
+  }
+
+  /**
+   * Reads an object (that was written using
+   * {@link #writeObjectToConfAsBase64}) from a configuration
+   *
+   * @param key for the configuration
+   * @param conf to read from
+   * @return the read object, or null if key is not present in conf
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T readObjectFromConfAsBase64(String key, Configuration conf) throws IOException {
+    String b64 = conf.get(key);
+    if (b64 == null) {
+      return null;
+    }
+
+    byte[] bytes = Base64.decodeBase64(b64.getBytes("UTF-8"));
+
+    ByteArrayInputStream bais = null;
+    GZIPInputStream gis = null;
+    ObjectInputStream ois = null;
+
+    try {
+      bais = new ByteArrayInputStream(bytes);
+      gis = new GZIPInputStream(bais);
+      ois = new ObjectInputStream(gis);
+      return (T) ois.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Could not read object from config with key " + key, e);
+    } catch (ClassCastException e) {
+      throw new IOException("Couldn't cast object read from config with key " + key, e);
+    } finally {
+      Closeables.close(ois);
+      Closeables.close(gis);
+      Closeables.close(bais);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
new file mode 100644
index 0000000..e537783
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/BenchmarkCounter.java
@@ -0,0 +1,114 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.parquet.hadoop.util.counters.mapred.MapRedCounterLoader;
+import org.apache.parquet.hadoop.util.counters.mapreduce.MapReduceCounterLoader;
+
+/**
+ * Encapsulate counter operations, compatible with Hadoop1/2, mapred/mapreduce API
+ *
+ * @author Tianshuo Deng
+ */
+public class BenchmarkCounter {
+
+  private static final String ENABLE_BYTES_READ_COUNTER = "parquet.benchmark.bytes.read";
+  private static final String ENABLE_BYTES_TOTAL_COUNTER = "parquet.benchmark.bytes.total";
+  private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
+  private static final String COUNTER_GROUP_NAME = "parquet";
+  private static final String BYTES_READ_COUNTER_NAME = "bytesread";
+  private static final String BYTES_TOTAL_COUNTER_NAME = "bytestotal";
+  private static final String TIME_READ_COUNTER_NAME = "timeread";
+  private static ICounter bytesReadCounter = new NullCounter();
+  private static ICounter totalBytesCounter = new NullCounter();
+  private static ICounter timeCounter = new NullCounter();
+  private static CounterLoader counterLoader;
+
+  /**
+   * Init counters in hadoop's mapreduce API, support both 1.x and 2.x
+   *
+   * @param context
+   */
+  public static void initCounterFromContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    counterLoader = new MapReduceCounterLoader(context);
+    loadCounters();
+  }
+
+  /**
+   * Init counters in hadoop's mapred API, which is used by cascading and Hive.
+   *
+   * @param reporter
+   * @param configuration
+   */
+  public static void initCounterFromReporter(Reporter reporter, Configuration configuration) {
+    counterLoader = new MapRedCounterLoader(reporter, configuration);
+    loadCounters();
+  }
+
+  private static void loadCounters() {
+    bytesReadCounter = getCounterWhenFlagIsSet(COUNTER_GROUP_NAME, BYTES_READ_COUNTER_NAME, ENABLE_BYTES_READ_COUNTER);
+    totalBytesCounter = getCounterWhenFlagIsSet(COUNTER_GROUP_NAME, BYTES_TOTAL_COUNTER_NAME, ENABLE_BYTES_TOTAL_COUNTER);
+    timeCounter = getCounterWhenFlagIsSet(COUNTER_GROUP_NAME, TIME_READ_COUNTER_NAME, ENABLE_TIME_READ_COUNTER);
+  }
+
+  private static ICounter getCounterWhenFlagIsSet(String groupName, String counterName, String counterFlag) {
+    return counterLoader.getCounterByNameAndFlag(groupName, counterName, counterFlag);
+  }
+
+  public static void incrementTotalBytes(long val) {
+    totalBytesCounter.increment(val);
+  }
+
+  public static long getTotalBytes() {
+    return totalBytesCounter.getCount();
+  }
+
+  public static void incrementBytesRead(long val) {
+    bytesReadCounter.increment(val);
+  }
+
+  public static long getBytesRead() {
+    return bytesReadCounter.getCount();
+  }
+
+  public static void incrementTime(long val) {
+    timeCounter.increment(val);
+  }
+
+  public static long getTime() {
+    return timeCounter.getCount();
+  }
+
+  public static class NullCounter implements ICounter {
+    @Override
+    public void increment(long val) {
+      //do nothing
+    }
+
+    @Override
+    public long getCount() {
+      return 0;
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java
new file mode 100644
index 0000000..0b9f92f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/CounterLoader.java
@@ -0,0 +1,28 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters;
+
+/**
+ * Factory interface for CounterLoaders, will load the counter according to groupName, counterName,
+ * and if in the configuration, flag with name counterFlag is false, the counter will not be loaded
+ * @author Tianshuo Deng
+ */
+public interface CounterLoader {
+  public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag);
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java
new file mode 100644
index 0000000..c10b8a8
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/ICounter.java
@@ -0,0 +1,29 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters;
+
+/**
+ * Interface for counters in mapred/mapreduce package of hadoop
+ * @author Tianshuo Deng
+ */
+public interface ICounter {
+  public void increment(long val);
+  public long getCount();
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java
new file mode 100644
index 0000000..4377d44
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterAdapter.java
@@ -0,0 +1,44 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapred;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Adapt a mapred counter to ICounter
+ * @author Tianshuo Deng
+ */
+public class MapRedCounterAdapter implements ICounter {
+  private org.apache.hadoop.mapred.Counters.Counter adaptee;
+
+  public MapRedCounterAdapter(Counters.Counter adaptee) {
+    this.adaptee = adaptee;
+  }
+
+  @Override
+  public void increment(long val) {
+    adaptee.increment(val);
+  }
+
+  @Override
+  public long getCount() {
+    return adaptee.getCounter();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java
new file mode 100644
index 0000000..0e5a32d
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapred/MapRedCounterLoader.java
@@ -0,0 +1,52 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.hadoop.util.counters.CounterLoader;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Concrete factory for counters in mapred API,
+ * get a counter using mapred API when the corresponding flag is set, otherwise return a NullCounter
+ * @author Tianshuo Deng
+ */
+public class MapRedCounterLoader implements CounterLoader {
+  private Reporter reporter;
+  private Configuration conf;
+
+  public MapRedCounterLoader(Reporter reporter, Configuration conf) {
+    this.reporter = reporter;
+    this.conf = conf;
+  }
+
+  @Override
+  public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag) {
+    if (conf.getBoolean(counterFlag, true)) {
+      Counters.Counter counter = reporter.getCounter(groupName, counterName);
+      if (counter != null) {
+        return new MapRedCounterAdapter(reporter.getCounter(groupName, counterName));
+      }
+    }
+    return new BenchmarkCounter.NullCounter();
+    }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java
new file mode 100644
index 0000000..1339977
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterAdapter.java
@@ -0,0 +1,45 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapreduce;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Adapt a mapreduce counter to ICounter
+ * @author Tianshuo Deng
+ */
+public class MapReduceCounterAdapter implements ICounter {
+  private Counter adaptee;
+
+  public MapReduceCounterAdapter(Counter adaptee) {
+    this.adaptee = adaptee;
+  }
+
+  @Override
+  public void increment(long val) {
+    ContextUtil.incrementCounter(adaptee, val);
+  }
+
+  @Override
+  public long getCount() {
+    return adaptee.getValue();  //To change body of implemented methods use File | Settings | File Templates.
+  }
+}


Mime
View raw message