parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [15/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:12 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
new file mode 100644
index 0000000..d85a231
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -0,0 +1,81 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.compat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
+import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator;
+import org.apache.parquet.filter2.statisticslevel.StatisticsFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.schema.MessageType;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Given a {@link Filter} applies it to a list of BlockMetaData (row groups)
+ * If the Filter is an {@link org.apache.parquet.filter.UnboundRecordFilter} or the no op filter,
+ * no filtering will be performed.
+ */
+public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
+  private final List<BlockMetaData> blocks;
+  private final MessageType schema;
+
+  public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
+    checkNotNull(filter, "filter");
+    return filter.accept(new RowGroupFilter(blocks, schema));
+  }
+
+  private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
+    this.blocks = checkNotNull(blocks, "blocks");
+    this.schema = checkNotNull(schema, "schema");
+  }
+
+  @Override
+  public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
+    FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
+
+    // check that the schema of the filter matches the schema of the file
+    SchemaCompatibilityValidator.validate(filterPredicate, schema);
+
+    List<BlockMetaData> filteredBlocks = new ArrayList<BlockMetaData>();
+
+    for (BlockMetaData block : blocks) {
+      if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
+        filteredBlocks.add(block);
+      }
+    }
+
+    return filteredBlocks;
+  }
+
+  @Override
+  public List<BlockMetaData> visit(FilterCompat.UnboundRecordFilterCompat unboundRecordFilterCompat) {
+    return blocks;
+  }
+
+  @Override
+  public List<BlockMetaData> visit(NoOpFilter noOpFilter) {
+    return blocks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
new file mode 100644
index 0000000..7f2187a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
@@ -0,0 +1,305 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.statisticslevel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.And;
+import org.apache.parquet.filter2.predicate.Operators.Column;
+import org.apache.parquet.filter2.predicate.Operators.Eq;
+import org.apache.parquet.filter2.predicate.Operators.Gt;
+import org.apache.parquet.filter2.predicate.Operators.GtEq;
+import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import org.apache.parquet.filter2.predicate.Operators.Lt;
+import org.apache.parquet.filter2.predicate.Operators.LtEq;
+import org.apache.parquet.filter2.predicate.Operators.Not;
+import org.apache.parquet.filter2.predicate.Operators.NotEq;
+import org.apache.parquet.filter2.predicate.Operators.Or;
+import org.apache.parquet.filter2.predicate.Operators.UserDefined;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+/**
+ * Applies a {@link org.apache.parquet.filter2.predicate.FilterPredicate} to statistics about a group of
+ * records.
+ *
+ * Note: the supplied predicate must not contain any instances of the not() operator as this is not
+ * supported by this filter.
+ *
+ * the supplied predicate should first be run through {@link org.apache.parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
+ * in a form that doesn't make use of the not() operator.
+ *
+ * the supplied predicate should also have already been run through
+ * {@link org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator}
+ * to make sure it is compatible with the schema of this file.
+ *
+ * Returns true if all the records represented by the statistics in the provided column metadata can be dropped.
+ *         false otherwise (including when it is not known, which is often the case).
+ */
+// TODO: this belongs in the parquet-column project, but some of the classes here need to be moved too
+// TODO: (https://issues.apache.org/jira/browse/PARQUET-38)
+public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
+
+  public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns) {
+    checkNotNull(pred, "pred");
+    checkNotNull(columns, "columns");
+    return pred.accept(new StatisticsFilter(columns));
+  }
+
+  private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+
+  private StatisticsFilter(List<ColumnChunkMetaData> columnsList) {
+    for (ColumnChunkMetaData chunk : columnsList) {
+      columns.put(chunk.getPath(), chunk);
+    }
+  }
+
+  private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+    ColumnChunkMetaData c = columns.get(columnPath);
+    checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
+    return c;
+  }
+
+  // is this column chunk composed entirely of nulls?
+  // assumes the column chunk's statistics is not empty
+  private boolean isAllNulls(ColumnChunkMetaData column) {
+    return column.getStatistics().getNumNulls() == column.getValueCount();
+  }
+
+  // are there any nulls in this column chunk?
+  // assumes the column chunk's statistics is not empty
+  private boolean hasNulls(ColumnChunkMetaData column) {
+    return column.getStatistics().getNumNulls() > 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
+    Column<T> filterColumn = eq.getColumn();
+    T value = eq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return false;
+    }
+
+    if (value == null) {
+      // we are looking for records where v eq(null)
+      // so drop if there are no nulls in this chunk
+      return !hasNulls(columnChunk);
+    }
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v eq(someNonNull)
+      // and this is a column of all nulls, so drop it
+      return true;
+    }
+
+    // drop if value < min || value > max
+    return value.compareTo(stats.genericGetMin()) < 0 || value.compareTo(stats.genericGetMax()) > 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
+    Column<T> filterColumn = notEq.getColumn();
+    T value = notEq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return false;
+    }
+
+    if (value == null) {
+      // we are looking for records where v notEq(null)
+      // so, if this is a column of all nulls, we can drop it
+      return isAllNulls(columnChunk);
+    }
+
+    if (hasNulls(columnChunk)) {
+      // we are looking for records where v notEq(someNonNull)
+      // but this chunk contains nulls, we cannot drop it
+      return false;
+    }
+
+    // drop if this is a column where min = max = value
+    return value.compareTo(stats.genericGetMin()) == 0 && value.compareTo(stats.genericGetMax()) == 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
+    Column<T> filterColumn = lt.getColumn();
+    T value = lt.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return false;
+    }
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v < someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    // drop if value <= min
+    return  value.compareTo(stats.genericGetMin()) <= 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
+    Column<T> filterColumn = ltEq.getColumn();
+    T value = ltEq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return false;
+    }
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v <= someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    // drop if value < min
+    return value.compareTo(stats.genericGetMin()) < 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
+    Column<T> filterColumn = gt.getColumn();
+    T value = gt.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return false;
+    }
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v > someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    // drop if value >= max
+    return value.compareTo(stats.genericGetMax()) >= 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
+    Column<T> filterColumn = gtEq.getColumn();
+    T value = gtEq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return false;
+    }
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v >= someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    // drop if value >= max
+    return value.compareTo(stats.genericGetMax()) > 0;
+  }
+
+  @Override
+  public Boolean visit(And and) {
+    // seems unintuitive to put an || not an && here but we can
+    // drop a chunk of records if we know that either the left or
+    // the right predicate agrees that no matter what we don't
+    // need this chunk.
+    return and.getLeft().accept(this) || and.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Or or) {
+    // seems unintuitive to put an && not an || here
+    // but we can only drop a chunk of records if we know that
+    // both the left and right predicates agree that no matter what
+    // we don't need this chunk.
+    return or.getLeft().accept(this) && or.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Not not) {
+    throw new IllegalArgumentException(
+        "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+  }
+
+  private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
+    Column<T> filterColumn = ud.getColumn();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    U udp = ud.getUserDefinedPredicate();
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (stats.isEmpty()) {
+      // we have no statistics available, we cannot drop any chunks
+      return false;
+    }
+
+    if (isAllNulls(columnChunk)) {
+      // there is no min max, there is nothing
+      // else we can say about this chunk, we
+      // cannot drop it.
+      return false;
+    }
+
+    org.apache.parquet.filter2.predicate.Statistics<T> udpStats =
+        new org.apache.parquet.filter2.predicate.Statistics<T>(stats.genericGetMin(), stats.genericGetMax());
+
+    if (inverted) {
+      return udp.inverseCanDrop(udpStats);
+    } else {
+      return udp.canDrop(udpStats);
+    }
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud) {
+    return visit(ud, false);
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> lnud) {
+    return visit(lnud.getUserDefined(), true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
new file mode 100644
index 0000000..859ec21
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -0,0 +1,735 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.format.converter;
+
+import static org.apache.parquet.format.Util.readFileMetaData;
+import static org.apache.parquet.format.Util.writePageHeader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.format.ColumnChunk;
+import org.apache.parquet.format.ColumnMetaData;
+import org.apache.parquet.format.ConvertedType;
+import org.apache.parquet.format.DataPageHeader;
+import org.apache.parquet.format.DataPageHeaderV2;
+import org.apache.parquet.format.DictionaryPageHeader;
+import org.apache.parquet.format.Encoding;
+import org.apache.parquet.format.FieldRepetitionType;
+import org.apache.parquet.format.FileMetaData;
+import org.apache.parquet.format.KeyValue;
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.PageType;
+import org.apache.parquet.format.RowGroup;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.format.Statistics;
+import org.apache.parquet.format.Type;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.parquet.schema.TypeVisitor;
+import org.apache.parquet.schema.Types;
+
+public class ParquetMetadataConverter {
+  private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);
+
+  public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
+    List<BlockMetaData> blocks = parquetMetadata.getBlocks();
+    List<RowGroup> rowGroups = new ArrayList<RowGroup>();
+    int numRows = 0;
+    for (BlockMetaData block : blocks) {
+      numRows += block.getRowCount();
+      addRowGroup(parquetMetadata, rowGroups, block);
+    }
+    FileMetaData fileMetaData = new FileMetaData(
+        currentVersion,
+        toParquetSchema(parquetMetadata.getFileMetaData().getSchema()),
+        numRows,
+        rowGroups);
+
+    Set<Entry<String, String>> keyValues = parquetMetadata.getFileMetaData().getKeyValueMetaData().entrySet();
+    for (Entry<String, String> keyValue : keyValues) {
+      addKeyValue(fileMetaData, keyValue.getKey(), keyValue.getValue());
+    }
+
+    fileMetaData.setCreated_by(parquetMetadata.getFileMetaData().getCreatedBy());
+    return fileMetaData;
+  }
+
+  List<SchemaElement> toParquetSchema(MessageType schema) {
+    List<SchemaElement> result = new ArrayList<SchemaElement>();
+    addToList(result, schema);
+    return result;
+  }
+
+  private void addToList(final List<SchemaElement> result, org.apache.parquet.schema.Type field) {
+    field.accept(new TypeVisitor() {
+      @Override
+      public void visit(PrimitiveType primitiveType) {
+        SchemaElement element = new SchemaElement(primitiveType.getName());
+        element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition()));
+        element.setType(getType(primitiveType.getPrimitiveTypeName()));
+        if (primitiveType.getOriginalType() != null) {
+          element.setConverted_type(getConvertedType(primitiveType.getOriginalType()));
+        }
+        if (primitiveType.getDecimalMetadata() != null) {
+          element.setPrecision(primitiveType.getDecimalMetadata().getPrecision());
+          element.setScale(primitiveType.getDecimalMetadata().getScale());
+        }
+        if (primitiveType.getTypeLength() > 0) {
+          element.setType_length(primitiveType.getTypeLength());
+        }
+        result.add(element);
+      }
+
+      @Override
+      public void visit(MessageType messageType) {
+        SchemaElement element = new SchemaElement(messageType.getName());
+        visitChildren(result, messageType.asGroupType(), element);
+      }
+
+      @Override
+      public void visit(GroupType groupType) {
+        SchemaElement element = new SchemaElement(groupType.getName());
+        element.setRepetition_type(toParquetRepetition(groupType.getRepetition()));
+        if (groupType.getOriginalType() != null) {
+          element.setConverted_type(getConvertedType(groupType.getOriginalType()));
+        }
+        visitChildren(result, groupType, element);
+      }
+
+      private void visitChildren(final List<SchemaElement> result,
+          GroupType groupType, SchemaElement element) {
+        element.setNum_children(groupType.getFieldCount());
+        result.add(element);
+        for (org.apache.parquet.schema.Type field : groupType.getFields()) {
+          addToList(result, field);
+        }
+      }
+    });
+  }
+
+  private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
+    //rowGroup.total_byte_size = ;
+    List<ColumnChunkMetaData> columns = block.getColumns();
+    List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
+    for (ColumnChunkMetaData columnMetaData : columns) {
+      ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPageOffset()); // verify this is the right offset
+      columnChunk.file_path = block.getPath(); // they are in the same file for now
+      columnChunk.meta_data = new ColumnMetaData(
+          getType(columnMetaData.getType()),
+          toFormatEncodings(columnMetaData.getEncodings()),
+          Arrays.asList(columnMetaData.getPath().toArray()),
+          columnMetaData.getCodec().getParquetCompressionCodec(),
+          columnMetaData.getValueCount(),
+          columnMetaData.getTotalUncompressedSize(),
+          columnMetaData.getTotalSize(),
+          columnMetaData.getFirstDataPageOffset());
+      columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
+      if (!columnMetaData.getStatistics().isEmpty()) {
+        columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
+      }
+//      columnChunk.meta_data.index_page_offset = ;
+//      columnChunk.meta_data.key_value_metadata = ; // nothing yet
+
+      parquetColumns.add(columnChunk);
+    }
+    RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount());
+    rowGroups.add(rowGroup);
+  }
+
+  private List<Encoding> toFormatEncodings(Set<org.apache.parquet.column.Encoding> encodings) {
+    List<Encoding> converted = new ArrayList<Encoding>(encodings.size());
+    for (org.apache.parquet.column.Encoding encoding : encodings) {
+      converted.add(getEncoding(encoding));
+    }
+    return converted;
+  }
+
+  private static final class EncodingList {
+
+    private final Set<org.apache.parquet.column.Encoding> encodings;
+
+    public EncodingList(Set<org.apache.parquet.column.Encoding> encodings) {
+      this.encodings = encodings;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof EncodingList) {
+        Set<org.apache.parquet.column.Encoding> other = ((EncodingList)obj).encodings;
+        return other.size() == encodings.size() && encodings.containsAll(other);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = 1;
+      for (org.apache.parquet.column.Encoding element : encodings)
+        result = 31 * result + (element == null ? 0 : element.hashCode());
+      return result;
+    }
+  }
+
+  private Map<EncodingList, Set<org.apache.parquet.column.Encoding>> encodingLists = new HashMap<EncodingList, Set<org.apache.parquet.column.Encoding>>();
+
+  private Set<org.apache.parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
+    Set<org.apache.parquet.column.Encoding> converted = new HashSet<org.apache.parquet.column.Encoding>();
+    for (Encoding encoding : encodings) {
+      converted.add(getEncoding(encoding));
+    }
+    converted = Collections.unmodifiableSet(converted);
+    EncodingList key = new EncodingList(converted);
+    Set<org.apache.parquet.column.Encoding> cached = encodingLists.get(key);
+    if (cached == null) {
+      cached = converted;
+      encodingLists.put(key, cached);
+    }
+    return cached;
+  }
+
+  public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) {
+    return org.apache.parquet.column.Encoding.valueOf(encoding.name());
+  }
+
+  public Encoding getEncoding(org.apache.parquet.column.Encoding encoding) {
+    return Encoding.valueOf(encoding.name());
+  }
+
+  public static Statistics toParquetStatistics(org.apache.parquet.column.statistics.Statistics statistics) {
+    Statistics stats = new Statistics();
+    if (!statistics.isEmpty()) {
+      stats.setNull_count(statistics.getNumNulls());
+      if(statistics.hasNonNullValue()) {
+        stats.setMax(statistics.getMaxBytes());
+        stats.setMin(statistics.getMinBytes());
+     }
+    }
+    return stats;
+  }
+
+  public static org.apache.parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) {
+    // create stats object based on the column type
+    org.apache.parquet.column.statistics.Statistics stats = org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType(type);
+    // If there was no statistics written to the footer, create an empty Statistics object and return
+    if (statistics != null) {
+      if (statistics.isSetMax() && statistics.isSetMin()) {
+        stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array());
+      }
+      stats.setNumNulls(statistics.null_count);
+    }
+    return stats;
+  }
+
+  public PrimitiveTypeName getPrimitive(Type type) {
+    switch (type) {
+      case BYTE_ARRAY: // TODO: rename BINARY and remove this switch
+        return PrimitiveTypeName.BINARY;
+      case INT64:
+        return PrimitiveTypeName.INT64;
+      case INT32:
+        return PrimitiveTypeName.INT32;
+      case BOOLEAN:
+        return PrimitiveTypeName.BOOLEAN;
+      case FLOAT:
+        return PrimitiveTypeName.FLOAT;
+      case DOUBLE:
+        return PrimitiveTypeName.DOUBLE;
+      case INT96:
+        return PrimitiveTypeName.INT96;
+      case FIXED_LEN_BYTE_ARRAY:
+        return PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+      default:
+        throw new RuntimeException("Unknown type " + type);
+    }
+  }
+
+  Type getType(PrimitiveTypeName type) {
+    switch (type) {
+      case INT64:
+        return Type.INT64;
+      case INT32:
+        return Type.INT32;
+      case BOOLEAN:
+        return Type.BOOLEAN;
+      case BINARY:
+        return Type.BYTE_ARRAY;
+      case FLOAT:
+        return Type.FLOAT;
+      case DOUBLE:
+        return Type.DOUBLE;
+      case INT96:
+        return Type.INT96;
+      case FIXED_LEN_BYTE_ARRAY:
+        return Type.FIXED_LEN_BYTE_ARRAY;
+      default:
+        throw new RuntimeException("Unknown primitive type " + type);
+    }
+  }
+
+  OriginalType getOriginalType(ConvertedType type) {
+    switch (type) {
+      case UTF8:
+        return OriginalType.UTF8;
+      case MAP:
+        return OriginalType.MAP;
+      case MAP_KEY_VALUE:
+        return OriginalType.MAP_KEY_VALUE;
+      case LIST:
+        return OriginalType.LIST;
+      case ENUM:
+        return OriginalType.ENUM;
+      case DECIMAL:
+        return OriginalType.DECIMAL;
+      case DATE:
+        return OriginalType.DATE;
+      case TIME_MILLIS:
+        return OriginalType.TIME_MILLIS;
+      case TIMESTAMP_MILLIS:
+        return OriginalType.TIMESTAMP_MILLIS;
+      case INTERVAL:
+        return OriginalType.INTERVAL;
+      case INT_8:
+        return OriginalType.INT_8;
+      case INT_16:
+        return OriginalType.INT_16;
+      case INT_32:
+        return OriginalType.INT_32;
+      case INT_64:
+        return OriginalType.INT_64;
+      case UINT_8:
+        return OriginalType.UINT_8;
+      case UINT_16:
+        return OriginalType.UINT_16;
+      case UINT_32:
+        return OriginalType.UINT_32;
+      case UINT_64:
+        return OriginalType.UINT_64;
+      case JSON:
+        return OriginalType.JSON;
+      case BSON:
+        return OriginalType.BSON;
+      default:
+        throw new RuntimeException("Unknown converted type " + type);
+    }
+  }
+
+  ConvertedType getConvertedType(OriginalType type) {
+    switch (type) {
+      case UTF8:
+        return ConvertedType.UTF8;
+      case MAP:
+        return ConvertedType.MAP;
+      case MAP_KEY_VALUE:
+        return ConvertedType.MAP_KEY_VALUE;
+      case LIST:
+        return ConvertedType.LIST;
+      case ENUM:
+        return ConvertedType.ENUM;
+      case DECIMAL:
+        return ConvertedType.DECIMAL;
+      case DATE:
+        return ConvertedType.DATE;
+      case TIME_MILLIS:
+        return ConvertedType.TIME_MILLIS;
+      case TIMESTAMP_MILLIS:
+        return ConvertedType.TIMESTAMP_MILLIS;
+      case INTERVAL:
+        return ConvertedType.INTERVAL;
+      case INT_8:
+        return ConvertedType.INT_8;
+      case INT_16:
+        return ConvertedType.INT_16;
+      case INT_32:
+        return ConvertedType.INT_32;
+      case INT_64:
+        return ConvertedType.INT_64;
+      case UINT_8:
+        return ConvertedType.UINT_8;
+      case UINT_16:
+        return ConvertedType.UINT_16;
+      case UINT_32:
+        return ConvertedType.UINT_32;
+      case UINT_64:
+        return ConvertedType.UINT_64;
+      case JSON:
+        return ConvertedType.JSON;
+      case BSON:
+        return ConvertedType.BSON;
+      default:
+        throw new RuntimeException("Unknown original type " + type);
+     }
+   }
+
+  private void addKeyValue(FileMetaData fileMetaData, String key, String value) {
+    KeyValue keyValue = new KeyValue(key);
+    keyValue.value = value;
+    fileMetaData.addToKey_value_metadata(keyValue);
+  }
+
+  private static interface MetadataFilterVisitor<T, E extends Throwable> {
+    T visit(NoFilter filter) throws E;
+    T visit(SkipMetadataFilter filter) throws E;
+    T visit(RangeMetadataFilter filter) throws E;
+  }
+
+  public abstract static class MetadataFilter {
+    private MetadataFilter() {}
+    abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
+  }
+  public static final MetadataFilter NO_FILTER = new NoFilter();
+  public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
+  /**
+   * [ startOffset, endOffset )
+   * @param startOffset
+   * @param endOffset
+   * @return the filter
+   */
+  public static final MetadataFilter range(long startOffset, long endOffset) {
+    return new RangeMetadataFilter(startOffset, endOffset);
+  }
+  private static final class NoFilter extends MetadataFilter {
+    private NoFilter() {}
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    @Override
+    public String toString() {
+      return "NO_FILTER";
+    }
+  }
+  private static final class SkipMetadataFilter extends MetadataFilter {
+    private SkipMetadataFilter() {}
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    @Override
+    public String toString() {
+      return "SKIP_ROW_GROUPS";
+    }
+  }
+  /**
+   * [ startOffset, endOffset )
+   * @author Julien Le Dem
+   */
+  static final class RangeMetadataFilter extends MetadataFilter {
+    final long startOffset;
+    final long endOffset;
+    RangeMetadataFilter(long startOffset, long endOffset) {
+      super();
+      this.startOffset = startOffset;
+      this.endOffset = endOffset;
+    }
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+    boolean contains(long offset) {
+      return offset >= this.startOffset && offset < this.endOffset;
+    }
+    @Override
+    public String toString() {
+      return "range(s:" + startOffset + ", e:" + endOffset + ")";
+    }
+  }
+
+  @Deprecated
+  public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
+    return readParquetMetadata(from, NO_FILTER);
+  }
+
+  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
+    List<RowGroup> rowGroups = metaData.getRow_groups();
+    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
+    for (RowGroup rowGroup : rowGroups) {
+      long totalSize = 0;
+      long startIndex = getOffset(rowGroup.getColumns().get(0));
+      for (ColumnChunk col : rowGroup.getColumns()) {
+        totalSize += col.getMeta_data().getTotal_compressed_size();
+      }
+      long midPoint = startIndex + totalSize / 2;
+      if (filter.contains(midPoint)) {
+        newRowGroups.add(rowGroup);
+      }
+    }
+    metaData.setRow_groups(newRowGroups);
+    return metaData;
+  }
+
+  static long getOffset(RowGroup rowGroup) {
+    return getOffset(rowGroup.getColumns().get(0));
+  }
+  static long getOffset(ColumnChunk columnChunk) {
+    ColumnMetaData md = columnChunk.getMeta_data();
+    long offset = md.getData_page_offset();
+    if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
+      offset = md.getDictionary_page_offset();
+    }
+    return offset;
+  }
+
+  public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
+    FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
+      @Override
+      public FileMetaData visit(NoFilter filter) throws IOException {
+        return readFileMetaData(from);
+      }
+      @Override
+      public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
+        return readFileMetaData(from, true);
+      }
+      @Override
+      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
+        return filterFileMetaData(readFileMetaData(from), filter);
+      }
+    });
+    if (Log.DEBUG) LOG.debug(fileMetaData);
+    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
+    if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
+    return parquetMetadata;
+  }
+
+  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
+    MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
+    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+    List<RowGroup> row_groups = parquetMetadata.getRow_groups();
+    if (row_groups != null) {
+      for (RowGroup rowGroup : row_groups) {
+        BlockMetaData blockMetaData = new BlockMetaData();
+        blockMetaData.setRowCount(rowGroup.getNum_rows());
+        blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
+        List<ColumnChunk> columns = rowGroup.getColumns();
+        String filePath = columns.get(0).getFile_path();
+        for (ColumnChunk columnChunk : columns) {
+          if ((filePath == null && columnChunk.getFile_path() != null)
+              || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
+            throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
+          }
+          ColumnMetaData metaData = columnChunk.meta_data;
+          ColumnPath path = getPath(metaData);
+          ColumnChunkMetaData column = ColumnChunkMetaData.get(
+              path,
+              messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
+              CompressionCodecName.fromParquet(metaData.codec),
+              fromFormatEncodings(metaData.encodings),
+              fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
+              metaData.data_page_offset,
+              metaData.dictionary_page_offset,
+              metaData.num_values,
+              metaData.total_compressed_size,
+              metaData.total_uncompressed_size);
+          // TODO
+          // index_page_offset
+          // key_value_metadata
+          blockMetaData.addColumn(column);
+        }
+        blockMetaData.setPath(filePath);
+        blocks.add(blockMetaData);
+      }
+    }
+    Map<String, String> keyValueMetaData = new HashMap<String, String>();
+    List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();
+    if (key_value_metadata != null) {
+      for (KeyValue keyValue : key_value_metadata) {
+        keyValueMetaData.put(keyValue.key, keyValue.value);
+      }
+    }
+    return new ParquetMetadata(
+        new org.apache.parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by()),
+        blocks);
+  }
+
+  private ColumnPath getPath(ColumnMetaData metaData) {
+    String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
+    return ColumnPath.get(path);
+  }
+
+  MessageType fromParquetSchema(List<SchemaElement> schema) {
+    Iterator<SchemaElement> iterator = schema.iterator();
+    SchemaElement root = iterator.next();
+    Types.MessageTypeBuilder builder = Types.buildMessage();
+    buildChildren(builder, iterator, root.getNum_children());
+    return builder.named(root.name);
+  }
+
+  private void buildChildren(Types.GroupBuilder builder,
+                             Iterator<SchemaElement> schema,
+                             int childrenCount) {
+    for (int i = 0; i < childrenCount; i++) {
+      SchemaElement schemaElement = schema.next();
+
+      // Create Parquet Type.
+      Types.Builder childBuilder;
+      if (schemaElement.type != null) {
+        Types.PrimitiveBuilder primitiveBuilder = builder.primitive(
+            getPrimitive(schemaElement.type),
+            fromParquetRepetition(schemaElement.repetition_type));
+        if (schemaElement.isSetType_length()) {
+          primitiveBuilder.length(schemaElement.type_length);
+        }
+        if (schemaElement.isSetPrecision()) {
+          primitiveBuilder.precision(schemaElement.precision);
+        }
+        if (schemaElement.isSetScale()) {
+          primitiveBuilder.scale(schemaElement.scale);
+        }
+        childBuilder = primitiveBuilder;
+
+      } else {
+        childBuilder = builder.group(fromParquetRepetition(schemaElement.repetition_type));
+        buildChildren((Types.GroupBuilder) childBuilder, schema, schemaElement.num_children);
+      }
+
+      if (schemaElement.isSetConverted_type()) {
+        childBuilder.as(getOriginalType(schemaElement.converted_type));
+      }
+      if (schemaElement.isSetField_id()) {
+        childBuilder.id(schemaElement.field_id);
+      }
+
+      childBuilder.named(schemaElement.name);
+    }
+  }
+
+  FieldRepetitionType toParquetRepetition(Repetition repetition) {
+    return FieldRepetitionType.valueOf(repetition.name());
+  }
+
+  Repetition fromParquetRepetition(FieldRepetitionType repetition) {
+    return Repetition.valueOf(repetition.name());
+  }
+
+  @Deprecated
+  public void writeDataPageHeader(
+      int uncompressedSize,
+      int compressedSize,
+      int valueCount,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding,
+      OutputStream to) throws IOException {
+    writePageHeader(newDataPageHeader(uncompressedSize,
+                                      compressedSize,
+                                      valueCount,
+                                      new org.apache.parquet.column.statistics.BooleanStatistics(),
+                                      rlEncoding,
+                                      dlEncoding,
+                                      valuesEncoding), to);
+  }
+
+  public void writeDataPageHeader(
+      int uncompressedSize,
+      int compressedSize,
+      int valueCount,
+      org.apache.parquet.column.statistics.Statistics statistics,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding,
+      OutputStream to) throws IOException {
+    writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding), to);
+  }
+
+  private PageHeader newDataPageHeader(
+      int uncompressedSize, int compressedSize,
+      int valueCount,
+      org.apache.parquet.column.statistics.Statistics statistics,
+      org.apache.parquet.column.Encoding rlEncoding,
+      org.apache.parquet.column.Encoding dlEncoding,
+      org.apache.parquet.column.Encoding valuesEncoding) {
+    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
+    // TODO: pageHeader.crc = ...;
+    pageHeader.setData_page_header(new DataPageHeader(
+        valueCount,
+        getEncoding(valuesEncoding),
+        getEncoding(dlEncoding),
+        getEncoding(rlEncoding)));
+    if (!statistics.isEmpty()) {
+      pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
+    }
+    return pageHeader;
+  }
+
+  public void writeDataPageV2Header(
+      int uncompressedSize, int compressedSize,
+      int valueCount, int nullCount, int rowCount,
+      org.apache.parquet.column.statistics.Statistics statistics,
+      org.apache.parquet.column.Encoding dataEncoding,
+      int rlByteLength, int dlByteLength,
+      OutputStream to) throws IOException {
+    writePageHeader(
+        newDataPageV2Header(
+            uncompressedSize, compressedSize,
+            valueCount, nullCount, rowCount,
+            statistics,
+            dataEncoding,
+            rlByteLength, dlByteLength), to);
+  }
+
+  private PageHeader newDataPageV2Header(
+      int uncompressedSize, int compressedSize,
+      int valueCount, int nullCount, int rowCount,
+      org.apache.parquet.column.statistics.Statistics<?> statistics,
+      org.apache.parquet.column.Encoding dataEncoding,
+      int rlByteLength, int dlByteLength) {
+    // TODO: pageHeader.crc = ...;
+    DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
+        valueCount, nullCount, rowCount,
+        getEncoding(dataEncoding),
+        dlByteLength, rlByteLength);
+    if (!statistics.isEmpty()) {
+      dataPageHeaderV2.setStatistics(toParquetStatistics(statistics));
+    }
+    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
+    pageHeader.setData_page_header_v2(dataPageHeaderV2);
+    return pageHeader;
+  }
+
+  public void writeDictionaryPageHeader(
+      int uncompressedSize, int compressedSize, int valueCount,
+      org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
+    PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
+    pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
+    writePageHeader(pageHeader, to);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BadConfigurationException.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BadConfigurationException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BadConfigurationException.java
new file mode 100644
index 0000000..67d9bc4
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BadConfigurationException.java
@@ -0,0 +1,47 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.ParquetRuntimeException;
+
+/**
+ * Thrown when the input/output formats are misconfigured
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class BadConfigurationException extends ParquetRuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public BadConfigurationException() {
+  }
+
+  public BadConfigurationException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public BadConfigurationException(String message) {
+    super(message);
+  }
+
+  public BadConfigurationException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
new file mode 100644
index 0000000..6840950
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -0,0 +1,195 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+class CodecFactory {
+
+  public class BytesDecompressor {
+
+    private final CompressionCodec codec;
+    private final Decompressor decompressor;
+
+    public BytesDecompressor(CompressionCodec codec) {
+      this.codec = codec;
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+      } else {
+        decompressor = null;
+      }
+    }
+
+    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
+      final BytesInput decompressed;
+      if (codec != null) {
+        decompressor.reset();
+        InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
+        decompressed = BytesInput.from(is, uncompressedSize);
+      } else {
+        decompressed = bytes;
+      }
+      return decompressed;
+    }
+
+    private void release() {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+      }
+    }
+  }
+
+  /**
+   * Encapsulates the logic around hadoop compression
+   *
+   * @author Julien Le Dem
+   *
+   */
+  public static class BytesCompressor {
+
+    private final CompressionCodec codec;
+    private final Compressor compressor;
+    private final ByteArrayOutputStream compressedOutBuffer;
+    private final CompressionCodecName codecName;
+
+    public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
+      this.codecName = codecName;
+      this.codec = codec;
+      if (codec != null) {
+        this.compressor = CodecPool.getCompressor(codec);
+        this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
+      } else {
+        this.compressor = null;
+        this.compressedOutBuffer = null;
+      }
+    }
+
+    public BytesInput compress(BytesInput bytes) throws IOException {
+      final BytesInput compressedBytes;
+      if (codec == null) {
+        compressedBytes = bytes;
+      } else {
+        compressedOutBuffer.reset();
+        if (compressor != null) {
+          // null compressor for non-native gzip
+          compressor.reset();
+        }
+        CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
+        bytes.writeAllTo(cos);
+        cos.finish();
+        cos.close();
+        compressedBytes = BytesInput.from(compressedOutBuffer);
+      }
+      return compressedBytes;
+    }
+
+    private void release() {
+      if (compressor != null) {
+        CodecPool.returnCompressor(compressor);
+      }
+    }
+
+    public CompressionCodecName getCodecName() {
+      return codecName;
+    }
+
+  }
+
+  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
+  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
+  private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
+  private final Configuration configuration;
+
+  public CodecFactory(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  /**
+   *
+   * @param codecName the requested codec
+   * @return the corresponding hadoop codec. null if UNCOMPRESSED
+   */
+  private CompressionCodec getCodec(CompressionCodecName codecName) {
+    String codecClassName = codecName.getHadoopCompressionCodecClassName();
+    if (codecClassName == null) {
+      return null;
+    }
+    CompressionCodec codec = codecByName.get(codecClassName);
+    if (codec != null) {
+      return codec;
+    }
+
+    try {
+      Class<?> codecClass = Class.forName(codecClassName);
+      codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
+      codecByName.put(codecClassName, codec);
+      return codec;
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
+    }
+  }
+
+  public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
+    BytesCompressor comp = compressors.get(codecName);
+    if (comp == null) {
+      CompressionCodec codec = getCodec(codecName);
+      comp = new BytesCompressor(codecName, codec, pageSize);
+      compressors.put(codecName, comp);
+    }
+    return comp;
+  }
+
+  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
+    BytesDecompressor decomp = decompressors.get(codecName);
+    if (decomp == null) {
+      CompressionCodec codec = getCodec(codecName);
+      decomp = new BytesDecompressor(codec);
+      decompressors.put(codecName, decomp);
+    }
+    return decomp;
+  }
+
+  public void release() {
+    for (BytesCompressor compressor : compressors.values()) {
+      compressor.release();
+    }
+    compressors.clear();
+    for (BytesDecompressor decompressor : decompressors.values()) {
+      decompressor.release();
+    }
+    decompressors.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
new file mode 100644
index 0000000..b6934c2
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -0,0 +1,170 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.parquet.Ints;
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * TODO: should this actually be called RowGroupImpl or something?
+ * The name is kind of confusing since it references three different "entities"
+ * in our format: columns, chunks, and pages
+ *
+ */
+class ColumnChunkPageReadStore implements PageReadStore {
+  private static final Log LOG = Log.getLog(ColumnChunkPageReadStore.class);
+
+  /**
+   * PageReader for a single column chunk. A column chunk contains
+   * several pages, which are yielded one by one in order.
+   *
+   * This implementation is provided with a list of pages, each of which
+   * is decompressed and passed through.
+   */
+  static final class ColumnChunkPageReader implements PageReader {
+
+    private final BytesDecompressor decompressor;
+    private final long valueCount;
+    private final List<DataPage> compressedPages;
+    private final DictionaryPage compressedDictionaryPage;
+
+    ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
+      this.decompressor = decompressor;
+      this.compressedPages = new LinkedList<DataPage>(compressedPages);
+      this.compressedDictionaryPage = compressedDictionaryPage;
+      int count = 0;
+      for (DataPage p : compressedPages) {
+        count += p.getValueCount();
+      }
+      this.valueCount = count;
+    }
+
+    @Override
+    public long getTotalValueCount() {
+      return valueCount;
+    }
+
+    @Override
+    public DataPage readPage() {
+      if (compressedPages.isEmpty()) {
+        return null;
+      }
+      DataPage compressedPage = compressedPages.remove(0);
+      return compressedPage.accept(new DataPage.Visitor<DataPage>() {
+        @Override
+        public DataPage visit(DataPageV1 dataPageV1) {
+          try {
+            return new DataPageV1(
+                decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()),
+                dataPageV1.getValueCount(),
+                dataPageV1.getUncompressedSize(),
+                dataPageV1.getStatistics(),
+                dataPageV1.getRlEncoding(),
+                dataPageV1.getDlEncoding(),
+                dataPageV1.getValueEncoding());
+          } catch (IOException e) {
+            throw new ParquetDecodingException("could not decompress page", e);
+          }
+        }
+
+        @Override
+        public DataPage visit(DataPageV2 dataPageV2) {
+          if (!dataPageV2.isCompressed()) {
+            return dataPageV2;
+          }
+          try {
+            int uncompressedSize = Ints.checkedCast(
+                dataPageV2.getUncompressedSize()
+                - dataPageV2.getDefinitionLevels().size()
+                - dataPageV2.getRepetitionLevels().size());
+            return DataPageV2.uncompressed(
+                dataPageV2.getRowCount(),
+                dataPageV2.getNullCount(),
+                dataPageV2.getValueCount(),
+                dataPageV2.getRepetitionLevels(),
+                dataPageV2.getDefinitionLevels(),
+                dataPageV2.getDataEncoding(),
+                decompressor.decompress(dataPageV2.getData(), uncompressedSize),
+                dataPageV2.getStatistics()
+                );
+          } catch (IOException e) {
+            throw new ParquetDecodingException("could not decompress page", e);
+          }
+        }
+      });
+    }
+
+    @Override
+    public DictionaryPage readDictionaryPage() {
+      if (compressedDictionaryPage == null) {
+        return null;
+      }
+      try {
+        return new DictionaryPage(
+            decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
+            compressedDictionaryPage.getDictionarySize(),
+            compressedDictionaryPage.getEncoding());
+      } catch (IOException e) {
+        throw new RuntimeException(e); // TODO: cleanup
+      }
+    }
+  }
+
+  private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
+  private final long rowCount;
+
+  public ColumnChunkPageReadStore(long rowCount) {
+    this.rowCount = rowCount;
+  }
+
+  @Override
+  public long getRowCount() {
+    return rowCount;
+  }
+
+  @Override
+  public PageReader getPageReader(ColumnDescriptor path) {
+    if (!readers.containsKey(path)) {
+      throw new IllegalArgumentException(path + " is not in the store: " + readers.keySet() + " " + rowCount);
+    }
+    return readers.get(path);
+  }
+
+  void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) {
+    if (readers.put(path, reader) != null) {
+      throw new RuntimeException(path+ " was added twice");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
new file mode 100644
index 0000000..0a0b316
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.Log.INFO;
+import static org.apache.parquet.column.statistics.Statistics.getStatsBasedOnType;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.ConcatenatingByteArrayCollector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageWriteStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.converter.ParquetMetadataConverter;
+import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
+import org.apache.parquet.io.ParquetEncodingException;
+import org.apache.parquet.schema.MessageType;
+
+class ColumnChunkPageWriteStore implements PageWriteStore {
+  private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
+
+  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+
+  private static final class ColumnChunkPageWriter implements PageWriter {
+
+    private final ColumnDescriptor path;
+    private final BytesCompressor compressor;
+
+    private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream();
+    private final ConcatenatingByteArrayCollector buf;
+    private DictionaryPage dictionaryPage;
+
+    private long uncompressedLength;
+    private long compressedLength;
+    private long totalValueCount;
+    private int pageCount;
+
+    private Set<Encoding> encodings = new HashSet<Encoding>();
+
+    private Statistics totalStatistics;
+
+    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) {
+      this.path = path;
+      this.compressor = compressor;
+      this.buf = new ConcatenatingByteArrayCollector();
+      this.totalStatistics = getStatsBasedOnType(this.path.getType());
+    }
+
+    @Override
+    public void writePage(BytesInput bytes,
+                          int valueCount,
+                          Statistics statistics,
+                          Encoding rlEncoding,
+                          Encoding dlEncoding,
+                          Encoding valuesEncoding) throws IOException {
+      long uncompressedSize = bytes.size();
+      if (uncompressedSize > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write page larger than Integer.MAX_VALUE bytes: " +
+            uncompressedSize);
+      }
+      BytesInput compressedBytes = compressor.compress(bytes);
+      long compressedSize = compressedBytes.size();
+      if (compressedSize > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+            + compressedSize);
+      }
+      tempOutputStream.reset();
+      parquetMetadataConverter.writeDataPageHeader(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          tempOutputStream);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+      // by concatenating before collecting instead of collecting twice,
+      // we only allocate one buffer to copy into instead of multiple.
+      buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
+      encodings.add(rlEncoding);
+      encodings.add(dlEncoding);
+      encodings.add(valuesEncoding);
+    }
+
+    @Override
+    public void writePageV2(
+        int rowCount, int nullCount, int valueCount,
+        BytesInput repetitionLevels, BytesInput definitionLevels,
+        Encoding dataEncoding, BytesInput data,
+        Statistics<?> statistics) throws IOException {
+      int rlByteLength = toIntWithCheck(repetitionLevels.size());
+      int dlByteLength = toIntWithCheck(definitionLevels.size());
+      int uncompressedSize = toIntWithCheck(
+          data.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      // TODO: decide if we compress
+      BytesInput compressedData = compressor.compress(data);
+      int compressedSize = toIntWithCheck(
+          compressedData.size() + repetitionLevels.size() + definitionLevels.size()
+      );
+      tempOutputStream.reset();
+      parquetMetadataConverter.writeDataPageV2Header(
+          uncompressedSize, compressedSize,
+          valueCount, nullCount, rowCount,
+          statistics,
+          dataEncoding,
+          rlByteLength,
+          dlByteLength,
+          tempOutputStream);
+      this.uncompressedLength += uncompressedSize;
+      this.compressedLength += compressedSize;
+      this.totalValueCount += valueCount;
+      this.pageCount += 1;
+      this.totalStatistics.mergeStatistics(statistics);
+
+      // by concatenating before collecting instead of collecting twice,
+      // we only allocate one buffer to copy into instead of multiple.
+      buf.collect(
+          BytesInput.concat(
+            BytesInput.from(tempOutputStream),
+            repetitionLevels,
+            definitionLevels,
+            compressedData)
+      );
+      encodings.add(dataEncoding);
+    }
+
+    private int toIntWithCheck(long size) {
+      if (size > Integer.MAX_VALUE) {
+        throw new ParquetEncodingException(
+            "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
+            size);
+      }
+      return (int)size;
+    }
+
+    @Override
+    public long getMemSize() {
+      return buf.size();
+    }
+
+    public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
+      writer.startColumn(path, totalValueCount, compressor.getCodecName());
+      if (dictionaryPage != null) {
+        writer.writeDictionaryPage(dictionaryPage);
+        encodings.add(dictionaryPage.getEncoding());
+      }
+      writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
+      writer.endColumn();
+      if (INFO) {
+        LOG.info(
+            String.format(
+                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
+                buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
+            + (dictionaryPage != null ? String.format(
+                    ", dic { %,d entries, %,dB raw, %,dB comp}",
+                    dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
+                    : ""));
+      }
+      encodings.clear();
+      pageCount = 0;
+    }
+
+    @Override
+    public long allocatedSize() {
+      return buf.size();
+    }
+
+    @Override
+    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
+      if (this.dictionaryPage != null) {
+        throw new ParquetEncodingException("Only one dictionary page is allowed");
+      }
+      BytesInput dictionaryBytes = dictionaryPage.getBytes();
+      int uncompressedSize = (int)dictionaryBytes.size();
+      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
+      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
+    }
+
+    @Override
+    public String memUsageString(String prefix) {
+      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
+    }
+  }
+
+  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
+  private final MessageType schema;
+
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {
+    this.schema = schema;
+    for (ColumnDescriptor path : schema.getColumns()) {
+      writers.put(path,  new ColumnChunkPageWriter(path, compressor, pageSize));
+    }
+  }
+
+  @Override
+  public PageWriter getPageWriter(ColumnDescriptor path) {
+    return writers.get(path);
+  }
+
+  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
+    for (ColumnDescriptor path : schema.getColumns()) {
+      ColumnChunkPageWriter pageWriter = writers.get(path);
+      pageWriter.writeToFileWriter(writer);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Footer.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Footer.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Footer.java
new file mode 100644
index 0000000..e707a5a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Footer.java
@@ -0,0 +1,57 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+/**
+ *
+ * Represent the footer for a given file
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class Footer {
+
+  private final Path file;
+
+  private final ParquetMetadata parquetMetadata;
+
+  public Footer(Path file, ParquetMetadata parquetMetadata) {
+    super();
+    this.file = file;
+    this.parquetMetadata = parquetMetadata;
+  }
+
+  public Path getFile() {
+    return file;
+  }
+
+  public ParquetMetadata getParquetMetadata() {
+    return parquetMetadata;
+  }
+
+  @Override
+  public String toString() {
+    return "Footer{"+file+", "+parquetMetadata+"}";
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
new file mode 100644
index 0000000..d40e87f
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -0,0 +1,244 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.hadoop.api.InitContext;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import static java.lang.String.format;
+import static org.apache.parquet.Log.DEBUG;
+import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
+
+class InternalParquetRecordReader<T> {
+  private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
+
+  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+  private final Filter filter;
+
+  private MessageType requestedSchema;
+  private MessageType fileSchema;
+  private int columnCount;
+  private final ReadSupport<T> readSupport;
+
+  private RecordMaterializer<T> recordConverter;
+
+  private T currentValue;
+  private long total;
+  private long current = 0;
+  private int currentBlock = -1;
+  private ParquetFileReader reader;
+  private org.apache.parquet.io.RecordReader<T> recordReader;
+  private boolean strictTypeChecking;
+
+  private long totalTimeSpentReadingBytes;
+  private long totalTimeSpentProcessingRecords;
+  private long startedAssemblingCurrentBlockAt;
+
+  private long totalCountLoadedSoFar = 0;
+
+  private Path file;
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter for filtering individual records
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
+    this.readSupport = readSupport;
+    this.filter = checkNotNull(filter, "filter");
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport) {
+    this(readSupport, FilterCompat.NOOP);
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter Optional filter for only returning matching records.
+   * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)}
+   */
+  @Deprecated
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+    this(readSupport, FilterCompat.get(filter));
+  }
+
+  private void checkRead() throws IOException {
+    if (current == totalCountLoadedSoFar) {
+      if (current != 0) {
+        totalTimeSpentProcessingRecords += (System.currentTimeMillis() - startedAssemblingCurrentBlockAt);
+        if (Log.INFO) {
+            LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+            final long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
+            if (totalTime != 0) {
+                final long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+                final long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
+                LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+            }
+        }
+      }
+
+      LOG.info("at row " + current + ". reading next block");
+      long t0 = System.currentTimeMillis();
+      PageReadStore pages = reader.readNextRowGroup();
+      if (pages == null) {
+        throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
+      }
+      long timeSpentReading = System.currentTimeMillis() - t0;
+      totalTimeSpentReadingBytes += timeSpentReading;
+      BenchmarkCounter.incrementTime(timeSpentReading);
+      if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+      if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
+      recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
+      startedAssemblingCurrentBlockAt = System.currentTimeMillis();
+      totalCountLoadedSoFar += pages.getRowCount();
+      ++ currentBlock;
+    }
+  }
+
+  public void close() throws IOException {
+    if (reader != null) {
+      reader.close();
+    }
+  }
+
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  public T getCurrentValue() throws IOException,
+  InterruptedException {
+    return currentValue;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return (float) current / total;
+  }
+
+  public void initialize(MessageType fileSchema,
+      Map<String, String> fileMetadata,
+      Path file, List<BlockMetaData> blocks, Configuration configuration)
+      throws IOException {
+    // initialize a ReadContext for this file
+    ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
+        configuration, toSetMultiMap(fileMetadata), fileSchema));
+    this.requestedSchema = readContext.getRequestedSchema();
+    this.fileSchema = fileSchema;
+    this.file = file;
+    this.columnCount = requestedSchema.getPaths().size();
+    this.recordConverter = readSupport.prepareForRead(
+        configuration, fileMetadata, fileSchema, readContext);
+    this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    reader = new ParquetFileReader(configuration, file, blocks, columns);
+    for (BlockMetaData block : blocks) {
+      total += block.getRowCount();
+    }
+    LOG.info("RecordReader initialized will read a total of " + total + " records.");
+  }
+
+  private boolean contains(GroupType group, String[] path, int index) {
+    if (index == path.length) {
+      return false;
+    }
+    if (group.containsField(path[index])) {
+      Type type = group.getType(path[index]);
+      if (type.isPrimitive()) {
+        return index + 1 == path.length;
+      } else {
+        return contains(type.asGroupType(), path, index + 1);
+      }
+    }
+    return false;
+  }
+
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    boolean recordFound = false;
+
+    while (!recordFound) {
+      // no more records left
+      if (current >= total) { return false; }
+
+      try {
+        checkRead();
+        currentValue = recordReader.read();
+        current ++;
+        if (recordReader.shouldSkipCurrentRecord()) {
+          // this record is being filtered via the filter2 package
+          if (DEBUG) LOG.debug("skipping record");
+          continue;
+        }
+
+        if (currentValue == null) {
+          // only happens with FilteredRecordReader at end of block
+          current = totalCountLoadedSoFar;
+          if (DEBUG) LOG.debug("filtered record reader reached end of block");
+          continue;
+        }
+
+        recordFound = true;
+
+        if (DEBUG) LOG.debug("read value: " + currentValue);
+      } catch (RuntimeException e) {
+        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
+      }
+    }
+    return true;
+  }
+
+  private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
+    Map<K, Set<V>> setMultiMap = new HashMap<K, Set<V>>();
+    for (Map.Entry<K, V> entry : map.entrySet()) {
+      Set<V> set = new HashSet<V>();
+      set.add(entry.getValue());
+      setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+    }
+    return Collections.unmodifiableMap(setMultiMap);
+  }
+
+}


Mime
View raw message