parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [10/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet.
Date Mon, 27 Apr 2015 23:12:07 GMT
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
new file mode 100644
index 0000000..1540f03
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
@@ -0,0 +1,47 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapreduce;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.hadoop.util.counters.CounterLoader;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Concrete factory for counters in mapred API,
+ * get a counter using mapreduce API when the corresponding flag is set, otherwise return a NullCounter
+ * @author Tianshuo Deng
+ */
+public class MapReduceCounterLoader implements CounterLoader {
+  private TaskInputOutputContext<?, ?, ?, ?> context;
+
+  public MapReduceCounterLoader(TaskInputOutputContext<?, ?, ?, ?> context) {
+    this.context = context;
+  }
+
+  @Override
+  public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag) {
+    if (ContextUtil.getConfiguration(context).getBoolean(counterFlag, true)) {
+      return new MapReduceCounterAdapter(ContextUtil.getCounter(context, groupName, counterName));
+    } else {
+      return new BenchmarkCounter.NullCounter();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
deleted file mode 100644
index fbfda49..0000000
--- a/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.compat;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.filter2.compat.FilterCompat.NoOpFilter;
-import parquet.filter2.compat.FilterCompat.Visitor;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.filter2.predicate.SchemaCompatibilityValidator;
-import parquet.filter2.statisticslevel.StatisticsFilter;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.schema.MessageType;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Given a {@link Filter} applies it to a list of BlockMetaData (row groups)
- * If the Filter is an {@link parquet.filter.UnboundRecordFilter} or the no op filter,
- * no filtering will be performed.
- */
-public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
-  private final List<BlockMetaData> blocks;
-  private final MessageType schema;
-
-  public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
-    checkNotNull(filter, "filter");
-    return filter.accept(new RowGroupFilter(blocks, schema));
-  }
-
-  private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
-    this.blocks = checkNotNull(blocks, "blocks");
-    this.schema = checkNotNull(schema, "schema");
-  }
-
-  @Override
-  public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
-    FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
-
-    // check that the schema of the filter matches the schema of the file
-    SchemaCompatibilityValidator.validate(filterPredicate, schema);
-
-    List<BlockMetaData> filteredBlocks = new ArrayList<BlockMetaData>();
-
-    for (BlockMetaData block : blocks) {
-      if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
-        filteredBlocks.add(block);
-      }
-    }
-
-    return filteredBlocks;
-  }
-
-  @Override
-  public List<BlockMetaData> visit(FilterCompat.UnboundRecordFilterCompat unboundRecordFilterCompat) {
-    return blocks;
-  }
-
-  @Override
-  public List<BlockMetaData> visit(NoOpFilter noOpFilter) {
-    return blocks;
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
deleted file mode 100644
index 7a7cd06..0000000
--- a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.statisticslevel;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import parquet.column.statistics.Statistics;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.filter2.predicate.Operators.And;
-import parquet.filter2.predicate.Operators.Column;
-import parquet.filter2.predicate.Operators.Eq;
-import parquet.filter2.predicate.Operators.Gt;
-import parquet.filter2.predicate.Operators.GtEq;
-import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
-import parquet.filter2.predicate.Operators.Lt;
-import parquet.filter2.predicate.Operators.LtEq;
-import parquet.filter2.predicate.Operators.Not;
-import parquet.filter2.predicate.Operators.NotEq;
-import parquet.filter2.predicate.Operators.Or;
-import parquet.filter2.predicate.Operators.UserDefined;
-import parquet.filter2.predicate.UserDefinedPredicate;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-import static parquet.Preconditions.checkArgument;
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Applies a {@link parquet.filter2.predicate.FilterPredicate} to statistics about a group of
- * records.
- *
- * Note: the supplied predicate must not contain any instances of the not() operator as this is not
- * supported by this filter.
- *
- * the supplied predicate should first be run through {@link parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
- * in a form that doesn't make use of the not() operator.
- *
- * the supplied predicate should also have already been run through
- * {@link parquet.filter2.predicate.SchemaCompatibilityValidator}
- * to make sure it is compatible with the schema of this file.
- *
- * Returns true if all the records represented by the statistics in the provided column metadata can be dropped.
- *         false otherwise (including when it is not known, which is often the case).
- */
-// TODO: this belongs in the parquet-column project, but some of the classes here need to be moved too
-// TODO: (https://issues.apache.org/jira/browse/PARQUET-38)
-public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
-
-  public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns) {
-    checkNotNull(pred, "pred");
-    checkNotNull(columns, "columns");
-    return pred.accept(new StatisticsFilter(columns));
-  }
-
-  private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
-
-  private StatisticsFilter(List<ColumnChunkMetaData> columnsList) {
-    for (ColumnChunkMetaData chunk : columnsList) {
-      columns.put(chunk.getPath(), chunk);
-    }
-  }
-
-  private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
-    ColumnChunkMetaData c = columns.get(columnPath);
-    checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
-    return c;
-  }
-
-  // is this column chunk composed entirely of nulls?
-  // assumes the column chunk's statistics is not empty
-  private boolean isAllNulls(ColumnChunkMetaData column) {
-    return column.getStatistics().getNumNulls() == column.getValueCount();
-  }
-
-  // are there any nulls in this column chunk?
-  // assumes the column chunk's statistics is not empty
-  private boolean hasNulls(ColumnChunkMetaData column) {
-    return column.getStatistics().getNumNulls() > 0;
-  }
-
-  @Override
-  public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
-    Column<T> filterColumn = eq.getColumn();
-    T value = eq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
-
-    if (stats.isEmpty()) {
-      // we have no statistics available, we cannot drop any chunks
-      return false;
-    }
-
-    if (value == null) {
-      // we are looking for records where v eq(null)
-      // so drop if there are no nulls in this chunk
-      return !hasNulls(columnChunk);
-    }
-
-    if (isAllNulls(columnChunk)) {
-      // we are looking for records where v eq(someNonNull)
-      // and this is a column of all nulls, so drop it
-      return true;
-    }
-
-    // drop if value < min || value > max
-    return value.compareTo(stats.genericGetMin()) < 0 || value.compareTo(stats.genericGetMax()) > 0;
-  }
-
-  @Override
-  public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
-    Column<T> filterColumn = notEq.getColumn();
-    T value = notEq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
-
-    if (stats.isEmpty()) {
-      // we have no statistics available, we cannot drop any chunks
-      return false;
-    }
-
-    if (value == null) {
-      // we are looking for records where v notEq(null)
-      // so, if this is a column of all nulls, we can drop it
-      return isAllNulls(columnChunk);
-    }
-
-    if (hasNulls(columnChunk)) {
-      // we are looking for records where v notEq(someNonNull)
-      // but this chunk contains nulls, we cannot drop it
-      return false;
-    }
-
-    // drop if this is a column where min = max = value
-    return value.compareTo(stats.genericGetMin()) == 0 && value.compareTo(stats.genericGetMax()) == 0;
-  }
-
-  @Override
-  public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
-    Column<T> filterColumn = lt.getColumn();
-    T value = lt.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
-
-    if (stats.isEmpty()) {
-      // we have no statistics available, we cannot drop any chunks
-      return false;
-    }
-
-    if (isAllNulls(columnChunk)) {
-      // we are looking for records where v < someValue
-      // this chunk is all nulls, so we can drop it
-      return true;
-    }
-
-    // drop if value <= min
-    return  value.compareTo(stats.genericGetMin()) <= 0;
-  }
-
-  @Override
-  public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
-    Column<T> filterColumn = ltEq.getColumn();
-    T value = ltEq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
-
-    if (stats.isEmpty()) {
-      // we have no statistics available, we cannot drop any chunks
-      return false;
-    }
-
-    if (isAllNulls(columnChunk)) {
-      // we are looking for records where v <= someValue
-      // this chunk is all nulls, so we can drop it
-      return true;
-    }
-
-    // drop if value < min
-    return value.compareTo(stats.genericGetMin()) < 0;
-  }
-
-  @Override
-  public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
-    Column<T> filterColumn = gt.getColumn();
-    T value = gt.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
-
-    if (stats.isEmpty()) {
-      // we have no statistics available, we cannot drop any chunks
-      return false;
-    }
-
-    if (isAllNulls(columnChunk)) {
-      // we are looking for records where v > someValue
-      // this chunk is all nulls, so we can drop it
-      return true;
-    }
-
-    // drop if value >= max
-    return value.compareTo(stats.genericGetMax()) >= 0;
-  }
-
-  @Override
-  public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
-    Column<T> filterColumn = gtEq.getColumn();
-    T value = gtEq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
-
-    if (stats.isEmpty()) {
-      // we have no statistics available, we cannot drop any chunks
-      return false;
-    }
-
-    if (isAllNulls(columnChunk)) {
-      // we are looking for records where v >= someValue
-      // this chunk is all nulls, so we can drop it
-      return true;
-    }
-
-    // drop if value >= max
-    return value.compareTo(stats.genericGetMax()) > 0;
-  }
-
-  @Override
-  public Boolean visit(And and) {
-    // seems unintuitive to put an || not an && here but we can
-    // drop a chunk of records if we know that either the left or
-    // the right predicate agrees that no matter what we don't
-    // need this chunk.
-    return and.getLeft().accept(this) || and.getRight().accept(this);
-  }
-
-  @Override
-  public Boolean visit(Or or) {
-    // seems unintuitive to put an && not an || here
-    // but we can only drop a chunk of records if we know that
-    // both the left and right predicates agree that no matter what
-    // we don't need this chunk.
-    return or.getLeft().accept(this) && or.getRight().accept(this);
-  }
-
-  @Override
-  public Boolean visit(Not not) {
-    throw new IllegalArgumentException(
-        "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
-  }
-
-  private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
-    Column<T> filterColumn = ud.getColumn();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    U udp = ud.getUserDefinedPredicate();
-    Statistics<T> stats = columnChunk.getStatistics();
-
-    if (stats.isEmpty()) {
-      // we have no statistics available, we cannot drop any chunks
-      return false;
-    }
-
-    if (isAllNulls(columnChunk)) {
-      // there is no min max, there is nothing
-      // else we can say about this chunk, we
-      // cannot drop it.
-      return false;
-    }
-
-    parquet.filter2.predicate.Statistics<T> udpStats =
-        new parquet.filter2.predicate.Statistics<T>(stats.genericGetMin(), stats.genericGetMax());
-
-    if (inverted) {
-      return udp.inverseCanDrop(udpStats);
-    } else {
-      return udp.canDrop(udpStats);
-    }
-  }
-
-  @Override
-  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud) {
-    return visit(ud, false);
-  }
-
-  @Override
-  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> lnud) {
-    return visit(lnud.getUserDefined(), true);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
deleted file mode 100644
index d763882..0000000
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.format.converter;
-
-import static parquet.format.Util.readFileMetaData;
-import static parquet.format.Util.writePageHeader;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import parquet.Log;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.format.ColumnChunk;
-import parquet.format.ColumnMetaData;
-import parquet.format.ConvertedType;
-import parquet.format.DataPageHeader;
-import parquet.format.DataPageHeaderV2;
-import parquet.format.DictionaryPageHeader;
-import parquet.format.Encoding;
-import parquet.format.FieldRepetitionType;
-import parquet.format.FileMetaData;
-import parquet.format.KeyValue;
-import parquet.format.PageHeader;
-import parquet.format.PageType;
-import parquet.format.RowGroup;
-import parquet.format.SchemaElement;
-import parquet.format.Statistics;
-import parquet.format.Type;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ParquetDecodingException;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
-import parquet.schema.TypeVisitor;
-import parquet.schema.Types;
-
-public class ParquetMetadataConverter {
-  private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);
-
-  public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
-    List<BlockMetaData> blocks = parquetMetadata.getBlocks();
-    List<RowGroup> rowGroups = new ArrayList<RowGroup>();
-    int numRows = 0;
-    for (BlockMetaData block : blocks) {
-      numRows += block.getRowCount();
-      addRowGroup(parquetMetadata, rowGroups, block);
-    }
-    FileMetaData fileMetaData = new FileMetaData(
-        currentVersion,
-        toParquetSchema(parquetMetadata.getFileMetaData().getSchema()),
-        numRows,
-        rowGroups);
-
-    Set<Entry<String, String>> keyValues = parquetMetadata.getFileMetaData().getKeyValueMetaData().entrySet();
-    for (Entry<String, String> keyValue : keyValues) {
-      addKeyValue(fileMetaData, keyValue.getKey(), keyValue.getValue());
-    }
-
-    fileMetaData.setCreated_by(parquetMetadata.getFileMetaData().getCreatedBy());
-    return fileMetaData;
-  }
-
-  List<SchemaElement> toParquetSchema(MessageType schema) {
-    List<SchemaElement> result = new ArrayList<SchemaElement>();
-    addToList(result, schema);
-    return result;
-  }
-
-  private void addToList(final List<SchemaElement> result, parquet.schema.Type field) {
-    field.accept(new TypeVisitor() {
-      @Override
-      public void visit(PrimitiveType primitiveType) {
-        SchemaElement element = new SchemaElement(primitiveType.getName());
-        element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition()));
-        element.setType(getType(primitiveType.getPrimitiveTypeName()));
-        if (primitiveType.getOriginalType() != null) {
-          element.setConverted_type(getConvertedType(primitiveType.getOriginalType()));
-        }
-        if (primitiveType.getDecimalMetadata() != null) {
-          element.setPrecision(primitiveType.getDecimalMetadata().getPrecision());
-          element.setScale(primitiveType.getDecimalMetadata().getScale());
-        }
-        if (primitiveType.getTypeLength() > 0) {
-          element.setType_length(primitiveType.getTypeLength());
-        }
-        result.add(element);
-      }
-
-      @Override
-      public void visit(MessageType messageType) {
-        SchemaElement element = new SchemaElement(messageType.getName());
-        visitChildren(result, messageType.asGroupType(), element);
-      }
-
-      @Override
-      public void visit(GroupType groupType) {
-        SchemaElement element = new SchemaElement(groupType.getName());
-        element.setRepetition_type(toParquetRepetition(groupType.getRepetition()));
-        if (groupType.getOriginalType() != null) {
-          element.setConverted_type(getConvertedType(groupType.getOriginalType()));
-        }
-        visitChildren(result, groupType, element);
-      }
-
-      private void visitChildren(final List<SchemaElement> result,
-          GroupType groupType, SchemaElement element) {
-        element.setNum_children(groupType.getFieldCount());
-        result.add(element);
-        for (parquet.schema.Type field : groupType.getFields()) {
-          addToList(result, field);
-        }
-      }
-    });
-  }
-
-  private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
-    //rowGroup.total_byte_size = ;
-    List<ColumnChunkMetaData> columns = block.getColumns();
-    List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
-    for (ColumnChunkMetaData columnMetaData : columns) {
-      ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPageOffset()); // verify this is the right offset
-      columnChunk.file_path = block.getPath(); // they are in the same file for now
-      columnChunk.meta_data = new parquet.format.ColumnMetaData(
-          getType(columnMetaData.getType()),
-          toFormatEncodings(columnMetaData.getEncodings()),
-          Arrays.asList(columnMetaData.getPath().toArray()),
-          columnMetaData.getCodec().getParquetCompressionCodec(),
-          columnMetaData.getValueCount(),
-          columnMetaData.getTotalUncompressedSize(),
-          columnMetaData.getTotalSize(),
-          columnMetaData.getFirstDataPageOffset());
-      columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
-      if (!columnMetaData.getStatistics().isEmpty()) {
-        columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
-      }
-//      columnChunk.meta_data.index_page_offset = ;
-//      columnChunk.meta_data.key_value_metadata = ; // nothing yet
-
-      parquetColumns.add(columnChunk);
-    }
-    RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount());
-    rowGroups.add(rowGroup);
-  }
-
-  private List<Encoding> toFormatEncodings(Set<parquet.column.Encoding> encodings) {
-    List<Encoding> converted = new ArrayList<Encoding>(encodings.size());
-    for (parquet.column.Encoding encoding : encodings) {
-      converted.add(getEncoding(encoding));
-    }
-    return converted;
-  }
-
-  private static final class EncodingList {
-
-    private final Set<parquet.column.Encoding> encodings;
-
-    public EncodingList(Set<parquet.column.Encoding> encodings) {
-      this.encodings = encodings;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj instanceof EncodingList) {
-        Set<parquet.column.Encoding> other = ((EncodingList)obj).encodings;
-        return other.size() == encodings.size() && encodings.containsAll(other);
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = 1;
-      for (parquet.column.Encoding element : encodings)
-        result = 31 * result + (element == null ? 0 : element.hashCode());
-      return result;
-    }
-  }
-
-  private Map<EncodingList, Set<parquet.column.Encoding>> encodingLists = new HashMap<EncodingList, Set<parquet.column.Encoding>>();
-
-  private Set<parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
-    Set<parquet.column.Encoding> converted = new HashSet<parquet.column.Encoding>();
-    for (Encoding encoding : encodings) {
-      converted.add(getEncoding(encoding));
-    }
-    converted = Collections.unmodifiableSet(converted);
-    EncodingList key = new EncodingList(converted);
-    Set<parquet.column.Encoding> cached = encodingLists.get(key);
-    if (cached == null) {
-      cached = converted;
-      encodingLists.put(key, cached);
-    }
-    return cached;
-  }
-
-  public parquet.column.Encoding getEncoding(Encoding encoding) {
-    return parquet.column.Encoding.valueOf(encoding.name());
-  }
-
-  public Encoding getEncoding(parquet.column.Encoding encoding) {
-    return Encoding.valueOf(encoding.name());
-  }
-
-  public static Statistics toParquetStatistics(parquet.column.statistics.Statistics statistics) {
-    Statistics stats = new Statistics();
-    if (!statistics.isEmpty()) {
-      stats.setNull_count(statistics.getNumNulls());
-      if(statistics.hasNonNullValue()) {
-        stats.setMax(statistics.getMaxBytes());
-        stats.setMin(statistics.getMinBytes());
-     }
-    }
-    return stats;
-  }
-
-  public static parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) {
-    // create stats object based on the column type
-    parquet.column.statistics.Statistics stats = parquet.column.statistics.Statistics.getStatsBasedOnType(type);
-    // If there was no statistics written to the footer, create an empty Statistics object and return
-    if (statistics != null) {
-      if (statistics.isSetMax() && statistics.isSetMin()) {
-        stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array());
-      }
-      stats.setNumNulls(statistics.null_count);
-    }
-    return stats;
-  }
-
-  public PrimitiveTypeName getPrimitive(Type type) {
-    switch (type) {
-      case BYTE_ARRAY: // TODO: rename BINARY and remove this switch
-        return PrimitiveTypeName.BINARY;
-      case INT64:
-        return PrimitiveTypeName.INT64;
-      case INT32:
-        return PrimitiveTypeName.INT32;
-      case BOOLEAN:
-        return PrimitiveTypeName.BOOLEAN;
-      case FLOAT:
-        return PrimitiveTypeName.FLOAT;
-      case DOUBLE:
-        return PrimitiveTypeName.DOUBLE;
-      case INT96:
-        return PrimitiveTypeName.INT96;
-      case FIXED_LEN_BYTE_ARRAY:
-        return PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
-      default:
-        throw new RuntimeException("Unknown type " + type);
-    }
-  }
-
-  Type getType(PrimitiveTypeName type) {
-    switch (type) {
-      case INT64:
-        return Type.INT64;
-      case INT32:
-        return Type.INT32;
-      case BOOLEAN:
-        return Type.BOOLEAN;
-      case BINARY:
-        return Type.BYTE_ARRAY;
-      case FLOAT:
-        return Type.FLOAT;
-      case DOUBLE:
-        return Type.DOUBLE;
-      case INT96:
-        return Type.INT96;
-      case FIXED_LEN_BYTE_ARRAY:
-        return Type.FIXED_LEN_BYTE_ARRAY;
-      default:
-        throw new RuntimeException("Unknown primitive type " + type);
-    }
-  }
-
-  OriginalType getOriginalType(ConvertedType type) {
-    switch (type) {
-      case UTF8:
-        return OriginalType.UTF8;
-      case MAP:
-        return OriginalType.MAP;
-      case MAP_KEY_VALUE:
-        return OriginalType.MAP_KEY_VALUE;
-      case LIST:
-        return OriginalType.LIST;
-      case ENUM:
-        return OriginalType.ENUM;
-      case DECIMAL:
-        return OriginalType.DECIMAL;
-      case DATE:
-        return OriginalType.DATE;
-      case TIME_MILLIS:
-        return OriginalType.TIME_MILLIS;
-      case TIMESTAMP_MILLIS:
-        return OriginalType.TIMESTAMP_MILLIS;
-      case INTERVAL:
-        return OriginalType.INTERVAL;
-      case INT_8:
-        return OriginalType.INT_8;
-      case INT_16:
-        return OriginalType.INT_16;
-      case INT_32:
-        return OriginalType.INT_32;
-      case INT_64:
-        return OriginalType.INT_64;
-      case UINT_8:
-        return OriginalType.UINT_8;
-      case UINT_16:
-        return OriginalType.UINT_16;
-      case UINT_32:
-        return OriginalType.UINT_32;
-      case UINT_64:
-        return OriginalType.UINT_64;
-      case JSON:
-        return OriginalType.JSON;
-      case BSON:
-        return OriginalType.BSON;
-      default:
-        throw new RuntimeException("Unknown converted type " + type);
-    }
-  }
-
-  ConvertedType getConvertedType(OriginalType type) {
-    switch (type) {
-      case UTF8:
-        return ConvertedType.UTF8;
-      case MAP:
-        return ConvertedType.MAP;
-      case MAP_KEY_VALUE:
-        return ConvertedType.MAP_KEY_VALUE;
-      case LIST:
-        return ConvertedType.LIST;
-      case ENUM:
-        return ConvertedType.ENUM;
-      case DECIMAL:
-        return ConvertedType.DECIMAL;
-      case DATE:
-        return ConvertedType.DATE;
-      case TIME_MILLIS:
-        return ConvertedType.TIME_MILLIS;
-      case TIMESTAMP_MILLIS:
-        return ConvertedType.TIMESTAMP_MILLIS;
-      case INTERVAL:
-        return ConvertedType.INTERVAL;
-      case INT_8:
-        return ConvertedType.INT_8;
-      case INT_16:
-        return ConvertedType.INT_16;
-      case INT_32:
-        return ConvertedType.INT_32;
-      case INT_64:
-        return ConvertedType.INT_64;
-      case UINT_8:
-        return ConvertedType.UINT_8;
-      case UINT_16:
-        return ConvertedType.UINT_16;
-      case UINT_32:
-        return ConvertedType.UINT_32;
-      case UINT_64:
-        return ConvertedType.UINT_64;
-      case JSON:
-        return ConvertedType.JSON;
-      case BSON:
-        return ConvertedType.BSON;
-      default:
-        throw new RuntimeException("Unknown original type " + type);
-     }
-   }
-
-  private void addKeyValue(FileMetaData fileMetaData, String key, String value) {
-    KeyValue keyValue = new KeyValue(key);
-    keyValue.value = value;
-    fileMetaData.addToKey_value_metadata(keyValue);
-  }
-
-  private static interface MetadataFilterVisitor<T, E extends Throwable> {
-    T visit(NoFilter filter) throws E;
-    T visit(SkipMetadataFilter filter) throws E;
-    T visit(RangeMetadataFilter filter) throws E;
-  }
-
-  public abstract static class MetadataFilter {
-    private MetadataFilter() {}
-    abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
-  }
-  public static final MetadataFilter NO_FILTER = new NoFilter();
-  public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
-  /**
-   * [ startOffset, endOffset )
-   * @param startOffset
-   * @param endOffset
-   * @return the filter
-   */
-  public static final MetadataFilter range(long startOffset, long endOffset) {
-    return new RangeMetadataFilter(startOffset, endOffset);
-  }
-  private static final class NoFilter extends MetadataFilter {
-    private NoFilter() {}
-    @Override
-    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
-      return visitor.visit(this);
-    }
-    @Override
-    public String toString() {
-      return "NO_FILTER";
-    }
-  }
-  private static final class SkipMetadataFilter extends MetadataFilter {
-    private SkipMetadataFilter() {}
-    @Override
-    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
-      return visitor.visit(this);
-    }
-    @Override
-    public String toString() {
-      return "SKIP_ROW_GROUPS";
-    }
-  }
-  /**
-   * [ startOffset, endOffset )
-   * @author Julien Le Dem
-   */
-  static final class RangeMetadataFilter extends MetadataFilter {
-    final long startOffset;
-    final long endOffset;
-    RangeMetadataFilter(long startOffset, long endOffset) {
-      super();
-      this.startOffset = startOffset;
-      this.endOffset = endOffset;
-    }
-    @Override
-    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
-      return visitor.visit(this);
-    }
-    boolean contains(long offset) {
-      return offset >= this.startOffset && offset < this.endOffset;
-    }
-    @Override
-    public String toString() {
-      return "range(s:" + startOffset + ", e:" + endOffset + ")";
-    }
-  }
-
-  @Deprecated
-  public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
-    return readParquetMetadata(from, NO_FILTER);
-  }
-
-  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
-    List<RowGroup> rowGroups = metaData.getRow_groups();
-    List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
-    for (RowGroup rowGroup : rowGroups) {
-      long totalSize = 0;
-      long startIndex = getOffset(rowGroup.getColumns().get(0));
-      for (ColumnChunk col : rowGroup.getColumns()) {
-        totalSize += col.getMeta_data().getTotal_compressed_size();
-      }
-      long midPoint = startIndex + totalSize / 2;
-      if (filter.contains(midPoint)) {
-        newRowGroups.add(rowGroup);
-      }
-    }
-    metaData.setRow_groups(newRowGroups);
-    return metaData;
-  }
-
-  static long getOffset(RowGroup rowGroup) {
-    return getOffset(rowGroup.getColumns().get(0));
-  }
-  static long getOffset(ColumnChunk columnChunk) {
-    ColumnMetaData md = columnChunk.getMeta_data();
-    long offset = md.getData_page_offset();
-    if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
-      offset = md.getDictionary_page_offset();
-    }
-    return offset;
-  }
-
-  public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
-    FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
-      @Override
-      public FileMetaData visit(NoFilter filter) throws IOException {
-        return readFileMetaData(from);
-      }
-      @Override
-      public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
-        return readFileMetaData(from, true);
-      }
-      @Override
-      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
-        return filterFileMetaData(readFileMetaData(from), filter);
-      }
-    });
-    if (Log.DEBUG) LOG.debug(fileMetaData);
-    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
-    if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
-    return parquetMetadata;
-  }
-
-  public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
-    MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-    List<RowGroup> row_groups = parquetMetadata.getRow_groups();
-    if (row_groups != null) {
-      for (RowGroup rowGroup : row_groups) {
-        BlockMetaData blockMetaData = new BlockMetaData();
-        blockMetaData.setRowCount(rowGroup.getNum_rows());
-        blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
-        List<ColumnChunk> columns = rowGroup.getColumns();
-        String filePath = columns.get(0).getFile_path();
-        for (ColumnChunk columnChunk : columns) {
-          if ((filePath == null && columnChunk.getFile_path() != null)
-              || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
-            throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
-          }
-          parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
-          ColumnPath path = getPath(metaData);
-          ColumnChunkMetaData column = ColumnChunkMetaData.get(
-              path,
-              messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
-              CompressionCodecName.fromParquet(metaData.codec),
-              fromFormatEncodings(metaData.encodings),
-              fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
-              metaData.data_page_offset,
-              metaData.dictionary_page_offset,
-              metaData.num_values,
-              metaData.total_compressed_size,
-              metaData.total_uncompressed_size);
-          // TODO
-          // index_page_offset
-          // key_value_metadata
-          blockMetaData.addColumn(column);
-        }
-        blockMetaData.setPath(filePath);
-        blocks.add(blockMetaData);
-      }
-    }
-    Map<String, String> keyValueMetaData = new HashMap<String, String>();
-    List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();
-    if (key_value_metadata != null) {
-      for (KeyValue keyValue : key_value_metadata) {
-        keyValueMetaData.put(keyValue.key, keyValue.value);
-      }
-    }
-    return new ParquetMetadata(
-        new parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by()),
-        blocks);
-  }
-
-  private ColumnPath getPath(parquet.format.ColumnMetaData metaData) {
-    String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
-    return ColumnPath.get(path);
-  }
-
-  MessageType fromParquetSchema(List<SchemaElement> schema) {
-    Iterator<SchemaElement> iterator = schema.iterator();
-    SchemaElement root = iterator.next();
-    Types.MessageTypeBuilder builder = Types.buildMessage();
-    buildChildren(builder, iterator, root.getNum_children());
-    return builder.named(root.name);
-  }
-
-  private void buildChildren(Types.GroupBuilder builder,
-                             Iterator<SchemaElement> schema,
-                             int childrenCount) {
-    for (int i = 0; i < childrenCount; i++) {
-      SchemaElement schemaElement = schema.next();
-
-      // Create Parquet Type.
-      Types.Builder childBuilder;
-      if (schemaElement.type != null) {
-        Types.PrimitiveBuilder primitiveBuilder = builder.primitive(
-            getPrimitive(schemaElement.type),
-            fromParquetRepetition(schemaElement.repetition_type));
-        if (schemaElement.isSetType_length()) {
-          primitiveBuilder.length(schemaElement.type_length);
-        }
-        if (schemaElement.isSetPrecision()) {
-          primitiveBuilder.precision(schemaElement.precision);
-        }
-        if (schemaElement.isSetScale()) {
-          primitiveBuilder.scale(schemaElement.scale);
-        }
-        childBuilder = primitiveBuilder;
-
-      } else {
-        childBuilder = builder.group(fromParquetRepetition(schemaElement.repetition_type));
-        buildChildren((Types.GroupBuilder) childBuilder, schema, schemaElement.num_children);
-      }
-
-      if (schemaElement.isSetConverted_type()) {
-        childBuilder.as(getOriginalType(schemaElement.converted_type));
-      }
-      if (schemaElement.isSetField_id()) {
-        childBuilder.id(schemaElement.field_id);
-      }
-
-      childBuilder.named(schemaElement.name);
-    }
-  }
-
-  FieldRepetitionType toParquetRepetition(Repetition repetition) {
-    return FieldRepetitionType.valueOf(repetition.name());
-  }
-
-  Repetition fromParquetRepetition(FieldRepetitionType repetition) {
-    return Repetition.valueOf(repetition.name());
-  }
-
-  @Deprecated
-  public void writeDataPageHeader(
-      int uncompressedSize,
-      int compressedSize,
-      int valueCount,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding,
-      OutputStream to) throws IOException {
-    writePageHeader(newDataPageHeader(uncompressedSize,
-                                      compressedSize,
-                                      valueCount,
-                                      new parquet.column.statistics.BooleanStatistics(),
-                                      rlEncoding,
-                                      dlEncoding,
-                                      valuesEncoding), to);
-  }
-
-  public void writeDataPageHeader(
-      int uncompressedSize,
-      int compressedSize,
-      int valueCount,
-      parquet.column.statistics.Statistics statistics,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding,
-      OutputStream to) throws IOException {
-    writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding), to);
-  }
-
-  private PageHeader newDataPageHeader(
-      int uncompressedSize, int compressedSize,
-      int valueCount,
-      parquet.column.statistics.Statistics statistics,
-      parquet.column.Encoding rlEncoding,
-      parquet.column.Encoding dlEncoding,
-      parquet.column.Encoding valuesEncoding) {
-    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
-    // TODO: pageHeader.crc = ...;
-    pageHeader.setData_page_header(new DataPageHeader(
-        valueCount,
-        getEncoding(valuesEncoding),
-        getEncoding(dlEncoding),
-        getEncoding(rlEncoding)));
-    if (!statistics.isEmpty()) {
-      pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
-    }
-    return pageHeader;
-  }
-
-  public void writeDataPageV2Header(
-      int uncompressedSize, int compressedSize,
-      int valueCount, int nullCount, int rowCount,
-      parquet.column.statistics.Statistics statistics,
-      parquet.column.Encoding dataEncoding,
-      int rlByteLength, int dlByteLength,
-      OutputStream to) throws IOException {
-    writePageHeader(
-        newDataPageV2Header(
-            uncompressedSize, compressedSize,
-            valueCount, nullCount, rowCount,
-            statistics,
-            dataEncoding,
-            rlByteLength, dlByteLength), to);
-  }
-
-  private PageHeader newDataPageV2Header(
-      int uncompressedSize, int compressedSize,
-      int valueCount, int nullCount, int rowCount,
-      parquet.column.statistics.Statistics<?> statistics,
-      parquet.column.Encoding dataEncoding,
-      int rlByteLength, int dlByteLength) {
-    // TODO: pageHeader.crc = ...;
-    DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
-        valueCount, nullCount, rowCount,
-        getEncoding(dataEncoding),
-        dlByteLength, rlByteLength);
-    if (!statistics.isEmpty()) {
-      dataPageHeaderV2.setStatistics(toParquetStatistics(statistics));
-    }
-    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
-    pageHeader.setData_page_header_v2(dataPageHeaderV2);
-    return pageHeader;
-  }
-
-  public void writeDictionaryPageHeader(
-      int uncompressedSize, int compressedSize, int valueCount,
-      parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
-    PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
-    pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
-    writePageHeader(pageHeader, to);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java b/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java
deleted file mode 100644
index 0e0653e..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import parquet.ParquetRuntimeException;
-
-/**
- * Thrown when the input/output formats are misconfigured
- *
- * @author Julien Le Dem
- *
- */
-public class BadConfigurationException extends ParquetRuntimeException {
-  private static final long serialVersionUID = 1L;
-
-  public BadConfigurationException() {
-  }
-
-  public BadConfigurationException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public BadConfigurationException(String message) {
-    super(message);
-  }
-
-  public BadConfigurationException(Throwable cause) {
-    super(cause);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
deleted file mode 100644
index df82c98..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-class CodecFactory {
-
-  public class BytesDecompressor {
-
-    private final CompressionCodec codec;
-    private final Decompressor decompressor;
-
-    public BytesDecompressor(CompressionCodec codec) {
-      this.codec = codec;
-      if (codec != null) {
-        decompressor = CodecPool.getDecompressor(codec);
-      } else {
-        decompressor = null;
-      }
-    }
-
-    public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
-      final BytesInput decompressed;
-      if (codec != null) {
-        decompressor.reset();
-        InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
-        decompressed = BytesInput.from(is, uncompressedSize);
-      } else {
-        decompressed = bytes;
-      }
-      return decompressed;
-    }
-
-    private void release() {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-      }
-    }
-  }
-
-  /**
-   * Encapsulates the logic around hadoop compression
-   *
-   * @author Julien Le Dem
-   *
-   */
-  public static class BytesCompressor {
-
-    private final CompressionCodec codec;
-    private final Compressor compressor;
-    private final ByteArrayOutputStream compressedOutBuffer;
-    private final CompressionCodecName codecName;
-
-    public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
-      this.codecName = codecName;
-      this.codec = codec;
-      if (codec != null) {
-        this.compressor = CodecPool.getCompressor(codec);
-        this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
-      } else {
-        this.compressor = null;
-        this.compressedOutBuffer = null;
-      }
-    }
-
-    public BytesInput compress(BytesInput bytes) throws IOException {
-      final BytesInput compressedBytes;
-      if (codec == null) {
-        compressedBytes = bytes;
-      } else {
-        compressedOutBuffer.reset();
-        if (compressor != null) {
-          // null compressor for non-native gzip
-          compressor.reset();
-        }
-        CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
-        bytes.writeAllTo(cos);
-        cos.finish();
-        cos.close();
-        compressedBytes = BytesInput.from(compressedOutBuffer);
-      }
-      return compressedBytes;
-    }
-
-    private void release() {
-      if (compressor != null) {
-        CodecPool.returnCompressor(compressor);
-      }
-    }
-
-    public CompressionCodecName getCodecName() {
-      return codecName;
-    }
-
-  }
-
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
-  private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
-  private final Configuration configuration;
-
-  public CodecFactory(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  /**
-   *
-   * @param codecName the requested codec
-   * @return the corresponding hadoop codec. null if UNCOMPRESSED
-   */
-  private CompressionCodec getCodec(CompressionCodecName codecName) {
-    String codecClassName = codecName.getHadoopCompressionCodecClassName();
-    if (codecClassName == null) {
-      return null;
-    }
-    CompressionCodec codec = codecByName.get(codecClassName);
-    if (codec != null) {
-      return codec;
-    }
-
-    try {
-      Class<?> codecClass = Class.forName(codecClassName);
-      codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
-      codecByName.put(codecClassName, codec);
-      return codec;
-    } catch (ClassNotFoundException e) {
-      throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
-    }
-  }
-
-  public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
-    BytesCompressor comp = compressors.get(codecName);
-    if (comp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      comp = new BytesCompressor(codecName, codec, pageSize);
-      compressors.put(codecName, comp);
-    }
-    return comp;
-  }
-
-  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
-    BytesDecompressor decomp = decompressors.get(codecName);
-    if (decomp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      decomp = new BytesDecompressor(codec);
-      decompressors.put(codecName, decomp);
-    }
-    return decomp;
-  }
-
-  public void release() {
-    for (BytesCompressor compressor : compressors.values()) {
-      compressor.release();
-    }
-    compressors.clear();
-    for (BytesDecompressor decompressor : decompressors.values()) {
-      decompressor.release();
-    }
-    decompressors.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
deleted file mode 100644
index 30349be..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import parquet.Ints;
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.hadoop.CodecFactory.BytesDecompressor;
-import parquet.io.ParquetDecodingException;
-
-/**
- * TODO: should this actually be called RowGroupImpl or something?
- * The name is kind of confusing since it references three different "entities"
- * in our format: columns, chunks, and pages
- *
- */
-class ColumnChunkPageReadStore implements PageReadStore {
-  private static final Log LOG = Log.getLog(ColumnChunkPageReadStore.class);
-
-  /**
-   * PageReader for a single column chunk. A column chunk contains
-   * several pages, which are yielded one by one in order.
-   *
-   * This implementation is provided with a list of pages, each of which
-   * is decompressed and passed through.
-   */
-  static final class ColumnChunkPageReader implements PageReader {
-
-    private final BytesDecompressor decompressor;
-    private final long valueCount;
-    private final List<DataPage> compressedPages;
-    private final DictionaryPage compressedDictionaryPage;
-
-    ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
-      this.decompressor = decompressor;
-      this.compressedPages = new LinkedList<DataPage>(compressedPages);
-      this.compressedDictionaryPage = compressedDictionaryPage;
-      int count = 0;
-      for (DataPage p : compressedPages) {
-        count += p.getValueCount();
-      }
-      this.valueCount = count;
-    }
-
-    @Override
-    public long getTotalValueCount() {
-      return valueCount;
-    }
-
-    @Override
-    public DataPage readPage() {
-      if (compressedPages.isEmpty()) {
-        return null;
-      }
-      DataPage compressedPage = compressedPages.remove(0);
-      return compressedPage.accept(new DataPage.Visitor<DataPage>() {
-        @Override
-        public DataPage visit(DataPageV1 dataPageV1) {
-          try {
-            return new DataPageV1(
-                decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()),
-                dataPageV1.getValueCount(),
-                dataPageV1.getUncompressedSize(),
-                dataPageV1.getStatistics(),
-                dataPageV1.getRlEncoding(),
-                dataPageV1.getDlEncoding(),
-                dataPageV1.getValueEncoding());
-          } catch (IOException e) {
-            throw new ParquetDecodingException("could not decompress page", e);
-          }
-        }
-
-        @Override
-        public DataPage visit(DataPageV2 dataPageV2) {
-          if (!dataPageV2.isCompressed()) {
-            return dataPageV2;
-          }
-          try {
-            int uncompressedSize = Ints.checkedCast(
-                dataPageV2.getUncompressedSize()
-                - dataPageV2.getDefinitionLevels().size()
-                - dataPageV2.getRepetitionLevels().size());
-            return DataPageV2.uncompressed(
-                dataPageV2.getRowCount(),
-                dataPageV2.getNullCount(),
-                dataPageV2.getValueCount(),
-                dataPageV2.getRepetitionLevels(),
-                dataPageV2.getDefinitionLevels(),
-                dataPageV2.getDataEncoding(),
-                decompressor.decompress(dataPageV2.getData(), uncompressedSize),
-                dataPageV2.getStatistics()
-                );
-          } catch (IOException e) {
-            throw new ParquetDecodingException("could not decompress page", e);
-          }
-        }
-      });
-    }
-
-    @Override
-    public DictionaryPage readDictionaryPage() {
-      if (compressedDictionaryPage == null) {
-        return null;
-      }
-      try {
-        return new DictionaryPage(
-            decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
-            compressedDictionaryPage.getDictionarySize(),
-            compressedDictionaryPage.getEncoding());
-      } catch (IOException e) {
-        throw new RuntimeException(e); // TODO: cleanup
-      }
-    }
-  }
-
-  private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
-  private final long rowCount;
-
-  public ColumnChunkPageReadStore(long rowCount) {
-    this.rowCount = rowCount;
-  }
-
-  @Override
-  public long getRowCount() {
-    return rowCount;
-  }
-
-  @Override
-  public PageReader getPageReader(ColumnDescriptor path) {
-    if (!readers.containsKey(path)) {
-      throw new IllegalArgumentException(path + " is not in the store: " + readers.keySet() + " " + rowCount);
-    }
-    return readers.get(path);
-  }
-
-  void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) {
-    if (readers.put(path, reader) != null) {
-      throw new RuntimeException(path+ " was added twice");
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
deleted file mode 100644
index e3bab0d..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.INFO;
-import static parquet.column.statistics.Statistics.getStatsBasedOnType;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.ConcatenatingByteArrayCollector;
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-
-class ColumnChunkPageWriteStore implements PageWriteStore {
-  private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
-
-  private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-
-  private static final class ColumnChunkPageWriter implements PageWriter {
-
-    private final ColumnDescriptor path;
-    private final BytesCompressor compressor;
-
-    private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream();
-    private final ConcatenatingByteArrayCollector buf;
-    private DictionaryPage dictionaryPage;
-
-    private long uncompressedLength;
-    private long compressedLength;
-    private long totalValueCount;
-    private int pageCount;
-
-    private Set<Encoding> encodings = new HashSet<Encoding>();
-
-    private Statistics totalStatistics;
-
-    private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) {
-      this.path = path;
-      this.compressor = compressor;
-      this.buf = new ConcatenatingByteArrayCollector();
-      this.totalStatistics = getStatsBasedOnType(this.path.getType());
-    }
-
-    @Override
-    public void writePage(BytesInput bytes,
-                          int valueCount,
-                          Statistics statistics,
-                          Encoding rlEncoding,
-                          Encoding dlEncoding,
-                          Encoding valuesEncoding) throws IOException {
-      long uncompressedSize = bytes.size();
-      if (uncompressedSize > Integer.MAX_VALUE) {
-        throw new ParquetEncodingException(
-            "Cannot write page larger than Integer.MAX_VALUE bytes: " +
-            uncompressedSize);
-      }
-      BytesInput compressedBytes = compressor.compress(bytes);
-      long compressedSize = compressedBytes.size();
-      if (compressedSize > Integer.MAX_VALUE) {
-        throw new ParquetEncodingException(
-            "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
-            + compressedSize);
-      }
-      tempOutputStream.reset();
-      parquetMetadataConverter.writeDataPageHeader(
-          (int)uncompressedSize,
-          (int)compressedSize,
-          valueCount,
-          statistics,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          tempOutputStream);
-      this.uncompressedLength += uncompressedSize;
-      this.compressedLength += compressedSize;
-      this.totalValueCount += valueCount;
-      this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
-      // by concatenating before collecting instead of collecting twice,
-      // we only allocate one buffer to copy into instead of multiple.
-      buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
-      encodings.add(rlEncoding);
-      encodings.add(dlEncoding);
-      encodings.add(valuesEncoding);
-    }
-
-    @Override
-    public void writePageV2(
-        int rowCount, int nullCount, int valueCount,
-        BytesInput repetitionLevels, BytesInput definitionLevels,
-        Encoding dataEncoding, BytesInput data,
-        Statistics<?> statistics) throws IOException {
-      int rlByteLength = toIntWithCheck(repetitionLevels.size());
-      int dlByteLength = toIntWithCheck(definitionLevels.size());
-      int uncompressedSize = toIntWithCheck(
-          data.size() + repetitionLevels.size() + definitionLevels.size()
-      );
-      // TODO: decide if we compress
-      BytesInput compressedData = compressor.compress(data);
-      int compressedSize = toIntWithCheck(
-          compressedData.size() + repetitionLevels.size() + definitionLevels.size()
-      );
-      tempOutputStream.reset();
-      parquetMetadataConverter.writeDataPageV2Header(
-          uncompressedSize, compressedSize,
-          valueCount, nullCount, rowCount,
-          statistics,
-          dataEncoding,
-          rlByteLength,
-          dlByteLength,
-          tempOutputStream);
-      this.uncompressedLength += uncompressedSize;
-      this.compressedLength += compressedSize;
-      this.totalValueCount += valueCount;
-      this.pageCount += 1;
-      this.totalStatistics.mergeStatistics(statistics);
-
-      // by concatenating before collecting instead of collecting twice,
-      // we only allocate one buffer to copy into instead of multiple.
-      buf.collect(
-          BytesInput.concat(
-            BytesInput.from(tempOutputStream),
-            repetitionLevels,
-            definitionLevels,
-            compressedData)
-      );
-      encodings.add(dataEncoding);
-    }
-
-    private int toIntWithCheck(long size) {
-      if (size > Integer.MAX_VALUE) {
-        throw new ParquetEncodingException(
-            "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
-            size);
-      }
-      return (int)size;
-    }
-
-    @Override
-    public long getMemSize() {
-      return buf.size();
-    }
-
-    public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
-      writer.startColumn(path, totalValueCount, compressor.getCodecName());
-      if (dictionaryPage != null) {
-        writer.writeDictionaryPage(dictionaryPage);
-        encodings.add(dictionaryPage.getEncoding());
-      }
-      writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
-      writer.endColumn();
-      if (INFO) {
-        LOG.info(
-            String.format(
-                "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
-                buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
-            + (dictionaryPage != null ? String.format(
-                    ", dic { %,d entries, %,dB raw, %,dB comp}",
-                    dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
-                    : ""));
-      }
-      encodings.clear();
-      pageCount = 0;
-    }
-
-    @Override
-    public long allocatedSize() {
-      return buf.size();
-    }
-
-    @Override
-    public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
-      if (this.dictionaryPage != null) {
-        throw new ParquetEncodingException("Only one dictionary page is allowed");
-      }
-      BytesInput dictionaryBytes = dictionaryPage.getBytes();
-      int uncompressedSize = (int)dictionaryBytes.size();
-      BytesInput compressedBytes = compressor.compress(dictionaryBytes);
-      this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
-    }
-
-    @Override
-    public String memUsageString(String prefix) {
-      return buf.memUsageString(prefix + " ColumnChunkPageWriter");
-    }
-  }
-
-  private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
-  private final MessageType schema;
-
-  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {
-    this.schema = schema;
-    for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, pageSize));
-    }
-  }
-
-  @Override
-  public PageWriter getPageWriter(ColumnDescriptor path) {
-    return writers.get(path);
-  }
-
-  public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
-    for (ColumnDescriptor path : schema.getColumns()) {
-      ColumnChunkPageWriter pageWriter = writers.get(path);
-      pageWriter.writeToFileWriter(writer);
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java b/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java
deleted file mode 100644
index 980a120..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-
-import org.apache.hadoop.fs.Path;
-
-import parquet.hadoop.metadata.ParquetMetadata;
-
-/**
- *
- * Represent the footer for a given file
- *
- * @author Julien Le Dem
- *
- */
-public class Footer {
-
-  private final Path file;
-
-  private final ParquetMetadata parquetMetadata;
-
-  public Footer(Path file, ParquetMetadata parquetMetadata) {
-    super();
-    this.file = file;
-    this.parquetMetadata = parquetMetadata;
-  }
-
-  public Path getFile() {
-    return file;
-  }
-
-  public ParquetMetadata getParquetMetadata() {
-    return parquetMetadata;
-  }
-
-  @Override
-  public String toString() {
-    return "Footer{"+file+", "+parquetMetadata+"}";
-  }
-}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
deleted file mode 100644
index 8ae7c57..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/* 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.PageReadStore;
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
-
-class InternalParquetRecordReader<T> {
-  private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
-
-  private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
-  private final Filter filter;
-
-  private MessageType requestedSchema;
-  private MessageType fileSchema;
-  private int columnCount;
-  private final ReadSupport<T> readSupport;
-
-  private RecordMaterializer<T> recordConverter;
-
-  private T currentValue;
-  private long total;
-  private long current = 0;
-  private int currentBlock = -1;
-  private ParquetFileReader reader;
-  private parquet.io.RecordReader<T> recordReader;
-  private boolean strictTypeChecking;
-
-  private long totalTimeSpentReadingBytes;
-  private long totalTimeSpentProcessingRecords;
-  private long startedAssemblingCurrentBlockAt;
-
-  private long totalCountLoadedSoFar = 0;
-
-  private Path file;
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   * @param filter for filtering individual records
-   */
-  public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
-    this.readSupport = readSupport;
-    this.filter = checkNotNull(filter, "filter");
-  }
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   */
-  public InternalParquetRecordReader(ReadSupport<T> readSupport) {
-    this(readSupport, FilterCompat.NOOP);
-  }
-
-  /**
-   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   * @param filter Optional filter for only returning matching records.
-   * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)}
-   */
-  @Deprecated
-  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
-    this(readSupport, FilterCompat.get(filter));
-  }
-
-  private void checkRead() throws IOException {
-    if (current == totalCountLoadedSoFar) {
-      if (current != 0) {
-        totalTimeSpentProcessingRecords += (System.currentTimeMillis() - startedAssemblingCurrentBlockAt);
-        if (Log.INFO) {
-            LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
-            final long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
-            if (totalTime != 0) {
-                final long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
-                final long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
-                LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
-            }
-        }
-      }
-
-      LOG.info("at row " + current + ". reading next block");
-      long t0 = System.currentTimeMillis();
-      PageReadStore pages = reader.readNextRowGroup();
-      if (pages == null) {
-        throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
-      }
-      long timeSpentReading = System.currentTimeMillis() - t0;
-      totalTimeSpentReadingBytes += timeSpentReading;
-      BenchmarkCounter.incrementTime(timeSpentReading);
-      if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
-      if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
-      MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
-      recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
-      startedAssemblingCurrentBlockAt = System.currentTimeMillis();
-      totalCountLoadedSoFar += pages.getRowCount();
-      ++ currentBlock;
-    }
-  }
-
-  public void close() throws IOException {
-    if (reader != null) {
-      reader.close();
-    }
-  }
-
-  public Void getCurrentKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-  public T getCurrentValue() throws IOException,
-  InterruptedException {
-    return currentValue;
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    return (float) current / total;
-  }
-
-  public void initialize(MessageType fileSchema,
-      Map<String, String> fileMetadata,
-      Path file, List<BlockMetaData> blocks, Configuration configuration)
-      throws IOException {
-    // initialize a ReadContext for this file
-    ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
-        configuration, toSetMultiMap(fileMetadata), fileSchema));
-    this.requestedSchema = readContext.getRequestedSchema();
-    this.fileSchema = fileSchema;
-    this.file = file;
-    this.columnCount = requestedSchema.getPaths().size();
-    this.recordConverter = readSupport.prepareForRead(
-        configuration, fileMetadata, fileSchema, readContext);
-    this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
-    List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    reader = new ParquetFileReader(configuration, file, blocks, columns);
-    for (BlockMetaData block : blocks) {
-      total += block.getRowCount();
-    }
-    LOG.info("RecordReader initialized will read a total of " + total + " records.");
-  }
-
-  private boolean contains(GroupType group, String[] path, int index) {
-    if (index == path.length) {
-      return false;
-    }
-    if (group.containsField(path[index])) {
-      Type type = group.getType(path[index]);
-      if (type.isPrimitive()) {
-        return index + 1 == path.length;
-      } else {
-        return contains(type.asGroupType(), path, index + 1);
-      }
-    }
-    return false;
-  }
-
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    boolean recordFound = false;
-
-    while (!recordFound) {
-      // no more records left
-      if (current >= total) { return false; }
-
-      try {
-        checkRead();
-        currentValue = recordReader.read();
-        current ++;
-        if (recordReader.shouldSkipCurrentRecord()) {
-          // this record is being filtered via the filter2 package
-          if (DEBUG) LOG.debug("skipping record");
-          continue;
-        }
-
-        if (currentValue == null) {
-          // only happens with FilteredRecordReader at end of block
-          current = totalCountLoadedSoFar;
-          if (DEBUG) LOG.debug("filtered record reader reached end of block");
-          continue;
-        }
-
-        recordFound = true;
-
-        if (DEBUG) LOG.debug("read value: " + currentValue);
-      } catch (RuntimeException e) {
-        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
-      }
-    }
-    return true;
-  }
-
-  private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
-    Map<K, Set<V>> setMultiMap = new HashMap<K, Set<V>>();
-    for (Map.Entry<K, V> entry : map.entrySet()) {
-      Set<V> set = new HashSet<V>();
-      set.add(entry.getValue());
-      setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
-    }
-    return Collections.unmodifiableMap(setMultiMap);
-  }
-
-}


Mime
View raw message