http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
new file mode 100644
index 0000000..1540f03
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/counters/mapreduce/MapReduceCounterLoader.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.util.counters.mapreduce;
+
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
+import org.apache.parquet.hadoop.util.counters.CounterLoader;
+import org.apache.parquet.hadoop.util.counters.ICounter;
+
+/**
+ * Concrete factory for counters in mapred API,
+ * get a counter using mapreduce API when the corresponding flag is set, otherwise return a NullCounter
+ * @author Tianshuo Deng
+ */
+public class MapReduceCounterLoader implements CounterLoader {
+ private TaskInputOutputContext<?, ?, ?, ?> context;
+
+ public MapReduceCounterLoader(TaskInputOutputContext<?, ?, ?, ?> context) {
+ this.context = context;
+ }
+
+ @Override
+ public ICounter getCounterByNameAndFlag(String groupName, String counterName, String counterFlag) {
+ if (ContextUtil.getConfiguration(context).getBoolean(counterFlag, true)) {
+ return new MapReduceCounterAdapter(ContextUtil.getCounter(context, groupName, counterName));
+ } else {
+ return new BenchmarkCounter.NullCounter();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
deleted file mode 100644
index fbfda49..0000000
--- a/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.compat;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.filter2.compat.FilterCompat.NoOpFilter;
-import parquet.filter2.compat.FilterCompat.Visitor;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.filter2.predicate.SchemaCompatibilityValidator;
-import parquet.filter2.statisticslevel.StatisticsFilter;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.schema.MessageType;
-
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Given a {@link Filter} applies it to a list of BlockMetaData (row groups)
- * If the Filter is an {@link parquet.filter.UnboundRecordFilter} or the no op filter,
- * no filtering will be performed.
- */
-public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
- private final List<BlockMetaData> blocks;
- private final MessageType schema;
-
- public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
- checkNotNull(filter, "filter");
- return filter.accept(new RowGroupFilter(blocks, schema));
- }
-
- private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
- this.blocks = checkNotNull(blocks, "blocks");
- this.schema = checkNotNull(schema, "schema");
- }
-
- @Override
- public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
- FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
-
- // check that the schema of the filter matches the schema of the file
- SchemaCompatibilityValidator.validate(filterPredicate, schema);
-
- List<BlockMetaData> filteredBlocks = new ArrayList<BlockMetaData>();
-
- for (BlockMetaData block : blocks) {
- if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
- filteredBlocks.add(block);
- }
- }
-
- return filteredBlocks;
- }
-
- @Override
- public List<BlockMetaData> visit(FilterCompat.UnboundRecordFilterCompat unboundRecordFilterCompat) {
- return blocks;
- }
-
- @Override
- public List<BlockMetaData> visit(NoOpFilter noOpFilter) {
- return blocks;
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
deleted file mode 100644
index 7a7cd06..0000000
--- a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.filter2.statisticslevel;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import parquet.column.statistics.Statistics;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.filter2.predicate.FilterPredicate;
-import parquet.filter2.predicate.Operators.And;
-import parquet.filter2.predicate.Operators.Column;
-import parquet.filter2.predicate.Operators.Eq;
-import parquet.filter2.predicate.Operators.Gt;
-import parquet.filter2.predicate.Operators.GtEq;
-import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
-import parquet.filter2.predicate.Operators.Lt;
-import parquet.filter2.predicate.Operators.LtEq;
-import parquet.filter2.predicate.Operators.Not;
-import parquet.filter2.predicate.Operators.NotEq;
-import parquet.filter2.predicate.Operators.Or;
-import parquet.filter2.predicate.Operators.UserDefined;
-import parquet.filter2.predicate.UserDefinedPredicate;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-
-import static parquet.Preconditions.checkArgument;
-import static parquet.Preconditions.checkNotNull;
-
-/**
- * Applies a {@link parquet.filter2.predicate.FilterPredicate} to statistics about a group of
- * records.
- *
- * Note: the supplied predicate must not contain any instances of the not() operator as this is not
- * supported by this filter.
- *
- * the supplied predicate should first be run through {@link parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
- * in a form that doesn't make use of the not() operator.
- *
- * the supplied predicate should also have already been run through
- * {@link parquet.filter2.predicate.SchemaCompatibilityValidator}
- * to make sure it is compatible with the schema of this file.
- *
- * Returns true if all the records represented by the statistics in the provided column metadata can be dropped.
- * false otherwise (including when it is not known, which is often the case).
- */
-// TODO: this belongs in the parquet-column project, but some of the classes here need to be moved too
-// TODO: (https://issues.apache.org/jira/browse/PARQUET-38)
-public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
-
- public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns) {
- checkNotNull(pred, "pred");
- checkNotNull(columns, "columns");
- return pred.accept(new StatisticsFilter(columns));
- }
-
- private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
-
- private StatisticsFilter(List<ColumnChunkMetaData> columnsList) {
- for (ColumnChunkMetaData chunk : columnsList) {
- columns.put(chunk.getPath(), chunk);
- }
- }
-
- private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
- ColumnChunkMetaData c = columns.get(columnPath);
- checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
- return c;
- }
-
- // is this column chunk composed entirely of nulls?
- // assumes the column chunk's statistics is not empty
- private boolean isAllNulls(ColumnChunkMetaData column) {
- return column.getStatistics().getNumNulls() == column.getValueCount();
- }
-
- // are there any nulls in this column chunk?
- // assumes the column chunk's statistics is not empty
- private boolean hasNulls(ColumnChunkMetaData column) {
- return column.getStatistics().getNumNulls() > 0;
- }
-
- @Override
- public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
- Column<T> filterColumn = eq.getColumn();
- T value = eq.getValue();
- ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
- Statistics<T> stats = columnChunk.getStatistics();
-
- if (stats.isEmpty()) {
- // we have no statistics available, we cannot drop any chunks
- return false;
- }
-
- if (value == null) {
- // we are looking for records where v eq(null)
- // so drop if there are no nulls in this chunk
- return !hasNulls(columnChunk);
- }
-
- if (isAllNulls(columnChunk)) {
- // we are looking for records where v eq(someNonNull)
- // and this is a column of all nulls, so drop it
- return true;
- }
-
- // drop if value < min || value > max
- return value.compareTo(stats.genericGetMin()) < 0 || value.compareTo(stats.genericGetMax()) > 0;
- }
-
- @Override
- public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
- Column<T> filterColumn = notEq.getColumn();
- T value = notEq.getValue();
- ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
- Statistics<T> stats = columnChunk.getStatistics();
-
- if (stats.isEmpty()) {
- // we have no statistics available, we cannot drop any chunks
- return false;
- }
-
- if (value == null) {
- // we are looking for records where v notEq(null)
- // so, if this is a column of all nulls, we can drop it
- return isAllNulls(columnChunk);
- }
-
- if (hasNulls(columnChunk)) {
- // we are looking for records where v notEq(someNonNull)
- // but this chunk contains nulls, we cannot drop it
- return false;
- }
-
- // drop if this is a column where min = max = value
- return value.compareTo(stats.genericGetMin()) == 0 && value.compareTo(stats.genericGetMax()) == 0;
- }
-
- @Override
- public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
- Column<T> filterColumn = lt.getColumn();
- T value = lt.getValue();
- ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
- Statistics<T> stats = columnChunk.getStatistics();
-
- if (stats.isEmpty()) {
- // we have no statistics available, we cannot drop any chunks
- return false;
- }
-
- if (isAllNulls(columnChunk)) {
- // we are looking for records where v < someValue
- // this chunk is all nulls, so we can drop it
- return true;
- }
-
- // drop if value <= min
- return value.compareTo(stats.genericGetMin()) <= 0;
- }
-
- @Override
- public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
- Column<T> filterColumn = ltEq.getColumn();
- T value = ltEq.getValue();
- ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
- Statistics<T> stats = columnChunk.getStatistics();
-
- if (stats.isEmpty()) {
- // we have no statistics available, we cannot drop any chunks
- return false;
- }
-
- if (isAllNulls(columnChunk)) {
- // we are looking for records where v <= someValue
- // this chunk is all nulls, so we can drop it
- return true;
- }
-
- // drop if value < min
- return value.compareTo(stats.genericGetMin()) < 0;
- }
-
- @Override
- public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
- Column<T> filterColumn = gt.getColumn();
- T value = gt.getValue();
- ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
- Statistics<T> stats = columnChunk.getStatistics();
-
- if (stats.isEmpty()) {
- // we have no statistics available, we cannot drop any chunks
- return false;
- }
-
- if (isAllNulls(columnChunk)) {
- // we are looking for records where v > someValue
- // this chunk is all nulls, so we can drop it
- return true;
- }
-
- // drop if value >= max
- return value.compareTo(stats.genericGetMax()) >= 0;
- }
-
- @Override
- public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
- Column<T> filterColumn = gtEq.getColumn();
- T value = gtEq.getValue();
- ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
- Statistics<T> stats = columnChunk.getStatistics();
-
- if (stats.isEmpty()) {
- // we have no statistics available, we cannot drop any chunks
- return false;
- }
-
- if (isAllNulls(columnChunk)) {
- // we are looking for records where v >= someValue
- // this chunk is all nulls, so we can drop it
- return true;
- }
-
- // drop if value >= max
- return value.compareTo(stats.genericGetMax()) > 0;
- }
-
- @Override
- public Boolean visit(And and) {
- // seems unintuitive to put an || not an && here but we can
- // drop a chunk of records if we know that either the left or
- // the right predicate agrees that no matter what we don't
- // need this chunk.
- return and.getLeft().accept(this) || and.getRight().accept(this);
- }
-
- @Override
- public Boolean visit(Or or) {
- // seems unintuitive to put an && not an || here
- // but we can only drop a chunk of records if we know that
- // both the left and right predicates agree that no matter what
- // we don't need this chunk.
- return or.getLeft().accept(this) && or.getRight().accept(this);
- }
-
- @Override
- public Boolean visit(Not not) {
- throw new IllegalArgumentException(
- "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
- }
-
- private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
- Column<T> filterColumn = ud.getColumn();
- ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
- U udp = ud.getUserDefinedPredicate();
- Statistics<T> stats = columnChunk.getStatistics();
-
- if (stats.isEmpty()) {
- // we have no statistics available, we cannot drop any chunks
- return false;
- }
-
- if (isAllNulls(columnChunk)) {
- // there is no min max, there is nothing
- // else we can say about this chunk, we
- // cannot drop it.
- return false;
- }
-
- parquet.filter2.predicate.Statistics<T> udpStats =
- new parquet.filter2.predicate.Statistics<T>(stats.genericGetMin(), stats.genericGetMax());
-
- if (inverted) {
- return udp.inverseCanDrop(udpStats);
- } else {
- return udp.canDrop(udpStats);
- }
- }
-
- @Override
- public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud) {
- return visit(ud, false);
- }
-
- @Override
- public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> lnud) {
- return visit(lnud.getUserDefined(), true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
deleted file mode 100644
index d763882..0000000
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.format.converter;
-
-import static parquet.format.Util.readFileMetaData;
-import static parquet.format.Util.writePageHeader;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import parquet.Log;
-import parquet.hadoop.metadata.ColumnPath;
-import parquet.format.ColumnChunk;
-import parquet.format.ColumnMetaData;
-import parquet.format.ConvertedType;
-import parquet.format.DataPageHeader;
-import parquet.format.DataPageHeaderV2;
-import parquet.format.DictionaryPageHeader;
-import parquet.format.Encoding;
-import parquet.format.FieldRepetitionType;
-import parquet.format.FileMetaData;
-import parquet.format.KeyValue;
-import parquet.format.PageHeader;
-import parquet.format.PageType;
-import parquet.format.RowGroup;
-import parquet.format.SchemaElement;
-import parquet.format.Statistics;
-import parquet.format.Type;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.hadoop.metadata.ParquetMetadata;
-import parquet.io.ParquetDecodingException;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type.Repetition;
-import parquet.schema.TypeVisitor;
-import parquet.schema.Types;
-
-public class ParquetMetadataConverter {
- private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);
-
- public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parquetMetadata) {
- List<BlockMetaData> blocks = parquetMetadata.getBlocks();
- List<RowGroup> rowGroups = new ArrayList<RowGroup>();
- int numRows = 0;
- for (BlockMetaData block : blocks) {
- numRows += block.getRowCount();
- addRowGroup(parquetMetadata, rowGroups, block);
- }
- FileMetaData fileMetaData = new FileMetaData(
- currentVersion,
- toParquetSchema(parquetMetadata.getFileMetaData().getSchema()),
- numRows,
- rowGroups);
-
- Set<Entry<String, String>> keyValues = parquetMetadata.getFileMetaData().getKeyValueMetaData().entrySet();
- for (Entry<String, String> keyValue : keyValues) {
- addKeyValue(fileMetaData, keyValue.getKey(), keyValue.getValue());
- }
-
- fileMetaData.setCreated_by(parquetMetadata.getFileMetaData().getCreatedBy());
- return fileMetaData;
- }
-
- List<SchemaElement> toParquetSchema(MessageType schema) {
- List<SchemaElement> result = new ArrayList<SchemaElement>();
- addToList(result, schema);
- return result;
- }
-
- private void addToList(final List<SchemaElement> result, parquet.schema.Type field) {
- field.accept(new TypeVisitor() {
- @Override
- public void visit(PrimitiveType primitiveType) {
- SchemaElement element = new SchemaElement(primitiveType.getName());
- element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition()));
- element.setType(getType(primitiveType.getPrimitiveTypeName()));
- if (primitiveType.getOriginalType() != null) {
- element.setConverted_type(getConvertedType(primitiveType.getOriginalType()));
- }
- if (primitiveType.getDecimalMetadata() != null) {
- element.setPrecision(primitiveType.getDecimalMetadata().getPrecision());
- element.setScale(primitiveType.getDecimalMetadata().getScale());
- }
- if (primitiveType.getTypeLength() > 0) {
- element.setType_length(primitiveType.getTypeLength());
- }
- result.add(element);
- }
-
- @Override
- public void visit(MessageType messageType) {
- SchemaElement element = new SchemaElement(messageType.getName());
- visitChildren(result, messageType.asGroupType(), element);
- }
-
- @Override
- public void visit(GroupType groupType) {
- SchemaElement element = new SchemaElement(groupType.getName());
- element.setRepetition_type(toParquetRepetition(groupType.getRepetition()));
- if (groupType.getOriginalType() != null) {
- element.setConverted_type(getConvertedType(groupType.getOriginalType()));
- }
- visitChildren(result, groupType, element);
- }
-
- private void visitChildren(final List<SchemaElement> result,
- GroupType groupType, SchemaElement element) {
- element.setNum_children(groupType.getFieldCount());
- result.add(element);
- for (parquet.schema.Type field : groupType.getFields()) {
- addToList(result, field);
- }
- }
- });
- }
-
- private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGroups, BlockMetaData block) {
- //rowGroup.total_byte_size = ;
- List<ColumnChunkMetaData> columns = block.getColumns();
- List<ColumnChunk> parquetColumns = new ArrayList<ColumnChunk>();
- for (ColumnChunkMetaData columnMetaData : columns) {
- ColumnChunk columnChunk = new ColumnChunk(columnMetaData.getFirstDataPageOffset()); // verify this is the right offset
- columnChunk.file_path = block.getPath(); // they are in the same file for now
- columnChunk.meta_data = new parquet.format.ColumnMetaData(
- getType(columnMetaData.getType()),
- toFormatEncodings(columnMetaData.getEncodings()),
- Arrays.asList(columnMetaData.getPath().toArray()),
- columnMetaData.getCodec().getParquetCompressionCodec(),
- columnMetaData.getValueCount(),
- columnMetaData.getTotalUncompressedSize(),
- columnMetaData.getTotalSize(),
- columnMetaData.getFirstDataPageOffset());
- columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
- if (!columnMetaData.getStatistics().isEmpty()) {
- columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
- }
-// columnChunk.meta_data.index_page_offset = ;
-// columnChunk.meta_data.key_value_metadata = ; // nothing yet
-
- parquetColumns.add(columnChunk);
- }
- RowGroup rowGroup = new RowGroup(parquetColumns, block.getTotalByteSize(), block.getRowCount());
- rowGroups.add(rowGroup);
- }
-
- private List<Encoding> toFormatEncodings(Set<parquet.column.Encoding> encodings) {
- List<Encoding> converted = new ArrayList<Encoding>(encodings.size());
- for (parquet.column.Encoding encoding : encodings) {
- converted.add(getEncoding(encoding));
- }
- return converted;
- }
-
- private static final class EncodingList {
-
- private final Set<parquet.column.Encoding> encodings;
-
- public EncodingList(Set<parquet.column.Encoding> encodings) {
- this.encodings = encodings;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof EncodingList) {
- Set<parquet.column.Encoding> other = ((EncodingList)obj).encodings;
- return other.size() == encodings.size() && encodings.containsAll(other);
- }
- return false;
- }
-
- @Override
- public int hashCode() {
- int result = 1;
- for (parquet.column.Encoding element : encodings)
- result = 31 * result + (element == null ? 0 : element.hashCode());
- return result;
- }
- }
-
- private Map<EncodingList, Set<parquet.column.Encoding>> encodingLists = new HashMap<EncodingList, Set<parquet.column.Encoding>>();
-
- private Set<parquet.column.Encoding> fromFormatEncodings(List<Encoding> encodings) {
- Set<parquet.column.Encoding> converted = new HashSet<parquet.column.Encoding>();
- for (Encoding encoding : encodings) {
- converted.add(getEncoding(encoding));
- }
- converted = Collections.unmodifiableSet(converted);
- EncodingList key = new EncodingList(converted);
- Set<parquet.column.Encoding> cached = encodingLists.get(key);
- if (cached == null) {
- cached = converted;
- encodingLists.put(key, cached);
- }
- return cached;
- }
-
- public parquet.column.Encoding getEncoding(Encoding encoding) {
- return parquet.column.Encoding.valueOf(encoding.name());
- }
-
- public Encoding getEncoding(parquet.column.Encoding encoding) {
- return Encoding.valueOf(encoding.name());
- }
-
- public static Statistics toParquetStatistics(parquet.column.statistics.Statistics statistics) {
- Statistics stats = new Statistics();
- if (!statistics.isEmpty()) {
- stats.setNull_count(statistics.getNumNulls());
- if(statistics.hasNonNullValue()) {
- stats.setMax(statistics.getMaxBytes());
- stats.setMin(statistics.getMinBytes());
- }
- }
- return stats;
- }
-
- public static parquet.column.statistics.Statistics fromParquetStatistics(Statistics statistics, PrimitiveTypeName type) {
- // create stats object based on the column type
- parquet.column.statistics.Statistics stats = parquet.column.statistics.Statistics.getStatsBasedOnType(type);
- // If there was no statistics written to the footer, create an empty Statistics object and return
- if (statistics != null) {
- if (statistics.isSetMax() && statistics.isSetMin()) {
- stats.setMinMaxFromBytes(statistics.min.array(), statistics.max.array());
- }
- stats.setNumNulls(statistics.null_count);
- }
- return stats;
- }
-
- public PrimitiveTypeName getPrimitive(Type type) {
- switch (type) {
- case BYTE_ARRAY: // TODO: rename BINARY and remove this switch
- return PrimitiveTypeName.BINARY;
- case INT64:
- return PrimitiveTypeName.INT64;
- case INT32:
- return PrimitiveTypeName.INT32;
- case BOOLEAN:
- return PrimitiveTypeName.BOOLEAN;
- case FLOAT:
- return PrimitiveTypeName.FLOAT;
- case DOUBLE:
- return PrimitiveTypeName.DOUBLE;
- case INT96:
- return PrimitiveTypeName.INT96;
- case FIXED_LEN_BYTE_ARRAY:
- return PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
- default:
- throw new RuntimeException("Unknown type " + type);
- }
- }
-
- Type getType(PrimitiveTypeName type) {
- switch (type) {
- case INT64:
- return Type.INT64;
- case INT32:
- return Type.INT32;
- case BOOLEAN:
- return Type.BOOLEAN;
- case BINARY:
- return Type.BYTE_ARRAY;
- case FLOAT:
- return Type.FLOAT;
- case DOUBLE:
- return Type.DOUBLE;
- case INT96:
- return Type.INT96;
- case FIXED_LEN_BYTE_ARRAY:
- return Type.FIXED_LEN_BYTE_ARRAY;
- default:
- throw new RuntimeException("Unknown primitive type " + type);
- }
- }
-
- OriginalType getOriginalType(ConvertedType type) {
- switch (type) {
- case UTF8:
- return OriginalType.UTF8;
- case MAP:
- return OriginalType.MAP;
- case MAP_KEY_VALUE:
- return OriginalType.MAP_KEY_VALUE;
- case LIST:
- return OriginalType.LIST;
- case ENUM:
- return OriginalType.ENUM;
- case DECIMAL:
- return OriginalType.DECIMAL;
- case DATE:
- return OriginalType.DATE;
- case TIME_MILLIS:
- return OriginalType.TIME_MILLIS;
- case TIMESTAMP_MILLIS:
- return OriginalType.TIMESTAMP_MILLIS;
- case INTERVAL:
- return OriginalType.INTERVAL;
- case INT_8:
- return OriginalType.INT_8;
- case INT_16:
- return OriginalType.INT_16;
- case INT_32:
- return OriginalType.INT_32;
- case INT_64:
- return OriginalType.INT_64;
- case UINT_8:
- return OriginalType.UINT_8;
- case UINT_16:
- return OriginalType.UINT_16;
- case UINT_32:
- return OriginalType.UINT_32;
- case UINT_64:
- return OriginalType.UINT_64;
- case JSON:
- return OriginalType.JSON;
- case BSON:
- return OriginalType.BSON;
- default:
- throw new RuntimeException("Unknown converted type " + type);
- }
- }
-
- ConvertedType getConvertedType(OriginalType type) {
- switch (type) {
- case UTF8:
- return ConvertedType.UTF8;
- case MAP:
- return ConvertedType.MAP;
- case MAP_KEY_VALUE:
- return ConvertedType.MAP_KEY_VALUE;
- case LIST:
- return ConvertedType.LIST;
- case ENUM:
- return ConvertedType.ENUM;
- case DECIMAL:
- return ConvertedType.DECIMAL;
- case DATE:
- return ConvertedType.DATE;
- case TIME_MILLIS:
- return ConvertedType.TIME_MILLIS;
- case TIMESTAMP_MILLIS:
- return ConvertedType.TIMESTAMP_MILLIS;
- case INTERVAL:
- return ConvertedType.INTERVAL;
- case INT_8:
- return ConvertedType.INT_8;
- case INT_16:
- return ConvertedType.INT_16;
- case INT_32:
- return ConvertedType.INT_32;
- case INT_64:
- return ConvertedType.INT_64;
- case UINT_8:
- return ConvertedType.UINT_8;
- case UINT_16:
- return ConvertedType.UINT_16;
- case UINT_32:
- return ConvertedType.UINT_32;
- case UINT_64:
- return ConvertedType.UINT_64;
- case JSON:
- return ConvertedType.JSON;
- case BSON:
- return ConvertedType.BSON;
- default:
- throw new RuntimeException("Unknown original type " + type);
- }
- }
-
- private void addKeyValue(FileMetaData fileMetaData, String key, String value) {
- KeyValue keyValue = new KeyValue(key);
- keyValue.value = value;
- fileMetaData.addToKey_value_metadata(keyValue);
- }
-
- private static interface MetadataFilterVisitor<T, E extends Throwable> {
- T visit(NoFilter filter) throws E;
- T visit(SkipMetadataFilter filter) throws E;
- T visit(RangeMetadataFilter filter) throws E;
- }
-
- public abstract static class MetadataFilter {
- private MetadataFilter() {}
- abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
- }
- public static final MetadataFilter NO_FILTER = new NoFilter();
- public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
- /**
- * [ startOffset, endOffset )
- * @param startOffset
- * @param endOffset
- * @return the filter
- */
- public static final MetadataFilter range(long startOffset, long endOffset) {
- return new RangeMetadataFilter(startOffset, endOffset);
- }
- private static final class NoFilter extends MetadataFilter {
- private NoFilter() {}
- @Override
- <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
- return visitor.visit(this);
- }
- @Override
- public String toString() {
- return "NO_FILTER";
- }
- }
- private static final class SkipMetadataFilter extends MetadataFilter {
- private SkipMetadataFilter() {}
- @Override
- <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
- return visitor.visit(this);
- }
- @Override
- public String toString() {
- return "SKIP_ROW_GROUPS";
- }
- }
- /**
- * [ startOffset, endOffset )
- * @author Julien Le Dem
- */
- static final class RangeMetadataFilter extends MetadataFilter {
- final long startOffset;
- final long endOffset;
- RangeMetadataFilter(long startOffset, long endOffset) {
- super();
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
- @Override
- <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
- return visitor.visit(this);
- }
- boolean contains(long offset) {
- return offset >= this.startOffset && offset < this.endOffset;
- }
- @Override
- public String toString() {
- return "range(s:" + startOffset + ", e:" + endOffset + ")";
- }
- }
-
- @Deprecated
- public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
- return readParquetMetadata(from, NO_FILTER);
- }
-
- static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
- List<RowGroup> rowGroups = metaData.getRow_groups();
- List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
- for (RowGroup rowGroup : rowGroups) {
- long totalSize = 0;
- long startIndex = getOffset(rowGroup.getColumns().get(0));
- for (ColumnChunk col : rowGroup.getColumns()) {
- totalSize += col.getMeta_data().getTotal_compressed_size();
- }
- long midPoint = startIndex + totalSize / 2;
- if (filter.contains(midPoint)) {
- newRowGroups.add(rowGroup);
- }
- }
- metaData.setRow_groups(newRowGroups);
- return metaData;
- }
-
- static long getOffset(RowGroup rowGroup) {
- return getOffset(rowGroup.getColumns().get(0));
- }
- static long getOffset(ColumnChunk columnChunk) {
- ColumnMetaData md = columnChunk.getMeta_data();
- long offset = md.getData_page_offset();
- if (md.isSetDictionary_page_offset() && offset > md.getDictionary_page_offset()) {
- offset = md.getDictionary_page_offset();
- }
- return offset;
- }
-
- public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException {
- FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor<FileMetaData, IOException>() {
- @Override
- public FileMetaData visit(NoFilter filter) throws IOException {
- return readFileMetaData(from);
- }
- @Override
- public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
- return readFileMetaData(from, true);
- }
- @Override
- public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
- return filterFileMetaData(readFileMetaData(from), filter);
- }
- });
- if (Log.DEBUG) LOG.debug(fileMetaData);
- ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
- if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
- return parquetMetadata;
- }
-
- public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
- MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
- List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
- List<RowGroup> row_groups = parquetMetadata.getRow_groups();
- if (row_groups != null) {
- for (RowGroup rowGroup : row_groups) {
- BlockMetaData blockMetaData = new BlockMetaData();
- blockMetaData.setRowCount(rowGroup.getNum_rows());
- blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size());
- List<ColumnChunk> columns = rowGroup.getColumns();
- String filePath = columns.get(0).getFile_path();
- for (ColumnChunk columnChunk : columns) {
- if ((filePath == null && columnChunk.getFile_path() != null)
- || (filePath != null && !filePath.equals(columnChunk.getFile_path()))) {
- throw new ParquetDecodingException("all column chunks of the same row group must be in the same file for now");
- }
- parquet.format.ColumnMetaData metaData = columnChunk.meta_data;
- ColumnPath path = getPath(metaData);
- ColumnChunkMetaData column = ColumnChunkMetaData.get(
- path,
- messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(),
- CompressionCodecName.fromParquet(metaData.codec),
- fromFormatEncodings(metaData.encodings),
- fromParquetStatistics(metaData.statistics, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName()),
- metaData.data_page_offset,
- metaData.dictionary_page_offset,
- metaData.num_values,
- metaData.total_compressed_size,
- metaData.total_uncompressed_size);
- // TODO
- // index_page_offset
- // key_value_metadata
- blockMetaData.addColumn(column);
- }
- blockMetaData.setPath(filePath);
- blocks.add(blockMetaData);
- }
- }
- Map<String, String> keyValueMetaData = new HashMap<String, String>();
- List<KeyValue> key_value_metadata = parquetMetadata.getKey_value_metadata();
- if (key_value_metadata != null) {
- for (KeyValue keyValue : key_value_metadata) {
- keyValueMetaData.put(keyValue.key, keyValue.value);
- }
- }
- return new ParquetMetadata(
- new parquet.hadoop.metadata.FileMetaData(messageType, keyValueMetaData, parquetMetadata.getCreated_by()),
- blocks);
- }
-
- private ColumnPath getPath(parquet.format.ColumnMetaData metaData) {
- String[] path = metaData.path_in_schema.toArray(new String[metaData.path_in_schema.size()]);
- return ColumnPath.get(path);
- }
-
- MessageType fromParquetSchema(List<SchemaElement> schema) {
- Iterator<SchemaElement> iterator = schema.iterator();
- SchemaElement root = iterator.next();
- Types.MessageTypeBuilder builder = Types.buildMessage();
- buildChildren(builder, iterator, root.getNum_children());
- return builder.named(root.name);
- }
-
- private void buildChildren(Types.GroupBuilder builder,
- Iterator<SchemaElement> schema,
- int childrenCount) {
- for (int i = 0; i < childrenCount; i++) {
- SchemaElement schemaElement = schema.next();
-
- // Create Parquet Type.
- Types.Builder childBuilder;
- if (schemaElement.type != null) {
- Types.PrimitiveBuilder primitiveBuilder = builder.primitive(
- getPrimitive(schemaElement.type),
- fromParquetRepetition(schemaElement.repetition_type));
- if (schemaElement.isSetType_length()) {
- primitiveBuilder.length(schemaElement.type_length);
- }
- if (schemaElement.isSetPrecision()) {
- primitiveBuilder.precision(schemaElement.precision);
- }
- if (schemaElement.isSetScale()) {
- primitiveBuilder.scale(schemaElement.scale);
- }
- childBuilder = primitiveBuilder;
-
- } else {
- childBuilder = builder.group(fromParquetRepetition(schemaElement.repetition_type));
- buildChildren((Types.GroupBuilder) childBuilder, schema, schemaElement.num_children);
- }
-
- if (schemaElement.isSetConverted_type()) {
- childBuilder.as(getOriginalType(schemaElement.converted_type));
- }
- if (schemaElement.isSetField_id()) {
- childBuilder.id(schemaElement.field_id);
- }
-
- childBuilder.named(schemaElement.name);
- }
- }
-
- FieldRepetitionType toParquetRepetition(Repetition repetition) {
- return FieldRepetitionType.valueOf(repetition.name());
- }
-
- Repetition fromParquetRepetition(FieldRepetitionType repetition) {
- return Repetition.valueOf(repetition.name());
- }
-
- @Deprecated
- public void writeDataPageHeader(
- int uncompressedSize,
- int compressedSize,
- int valueCount,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding,
- OutputStream to) throws IOException {
- writePageHeader(newDataPageHeader(uncompressedSize,
- compressedSize,
- valueCount,
- new parquet.column.statistics.BooleanStatistics(),
- rlEncoding,
- dlEncoding,
- valuesEncoding), to);
- }
-
- public void writeDataPageHeader(
- int uncompressedSize,
- int compressedSize,
- int valueCount,
- parquet.column.statistics.Statistics statistics,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding,
- OutputStream to) throws IOException {
- writePageHeader(newDataPageHeader(uncompressedSize, compressedSize, valueCount, statistics, rlEncoding, dlEncoding, valuesEncoding), to);
- }
-
- private PageHeader newDataPageHeader(
- int uncompressedSize, int compressedSize,
- int valueCount,
- parquet.column.statistics.Statistics statistics,
- parquet.column.Encoding rlEncoding,
- parquet.column.Encoding dlEncoding,
- parquet.column.Encoding valuesEncoding) {
- PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
- // TODO: pageHeader.crc = ...;
- pageHeader.setData_page_header(new DataPageHeader(
- valueCount,
- getEncoding(valuesEncoding),
- getEncoding(dlEncoding),
- getEncoding(rlEncoding)));
- if (!statistics.isEmpty()) {
- pageHeader.getData_page_header().setStatistics(toParquetStatistics(statistics));
- }
- return pageHeader;
- }
-
- public void writeDataPageV2Header(
- int uncompressedSize, int compressedSize,
- int valueCount, int nullCount, int rowCount,
- parquet.column.statistics.Statistics statistics,
- parquet.column.Encoding dataEncoding,
- int rlByteLength, int dlByteLength,
- OutputStream to) throws IOException {
- writePageHeader(
- newDataPageV2Header(
- uncompressedSize, compressedSize,
- valueCount, nullCount, rowCount,
- statistics,
- dataEncoding,
- rlByteLength, dlByteLength), to);
- }
-
- private PageHeader newDataPageV2Header(
- int uncompressedSize, int compressedSize,
- int valueCount, int nullCount, int rowCount,
- parquet.column.statistics.Statistics<?> statistics,
- parquet.column.Encoding dataEncoding,
- int rlByteLength, int dlByteLength) {
- // TODO: pageHeader.crc = ...;
- DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2(
- valueCount, nullCount, rowCount,
- getEncoding(dataEncoding),
- dlByteLength, rlByteLength);
- if (!statistics.isEmpty()) {
- dataPageHeaderV2.setStatistics(toParquetStatistics(statistics));
- }
- PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize);
- pageHeader.setData_page_header_v2(dataPageHeaderV2);
- return pageHeader;
- }
-
- public void writeDictionaryPageHeader(
- int uncompressedSize, int compressedSize, int valueCount,
- parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
- PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
- pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
- writePageHeader(pageHeader, to);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java b/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java
deleted file mode 100644
index 0e0653e..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/BadConfigurationException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import parquet.ParquetRuntimeException;
-
-/**
- * Thrown when the input/output formats are misconfigured
- *
- * @author Julien Le Dem
- *
- */
-public class BadConfigurationException extends ParquetRuntimeException {
- private static final long serialVersionUID = 1L;
-
- public BadConfigurationException() {
- }
-
- public BadConfigurationException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public BadConfigurationException(String message) {
- super(message);
- }
-
- public BadConfigurationException(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
deleted file mode 100644
index df82c98..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import parquet.bytes.BytesInput;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-class CodecFactory {
-
- public class BytesDecompressor {
-
- private final CompressionCodec codec;
- private final Decompressor decompressor;
-
- public BytesDecompressor(CompressionCodec codec) {
- this.codec = codec;
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- } else {
- decompressor = null;
- }
- }
-
- public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
- final BytesInput decompressed;
- if (codec != null) {
- decompressor.reset();
- InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
- decompressed = BytesInput.from(is, uncompressedSize);
- } else {
- decompressed = bytes;
- }
- return decompressed;
- }
-
- private void release() {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
- }
-
- /**
- * Encapsulates the logic around hadoop compression
- *
- * @author Julien Le Dem
- *
- */
- public static class BytesCompressor {
-
- private final CompressionCodec codec;
- private final Compressor compressor;
- private final ByteArrayOutputStream compressedOutBuffer;
- private final CompressionCodecName codecName;
-
- public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
- this.codecName = codecName;
- this.codec = codec;
- if (codec != null) {
- this.compressor = CodecPool.getCompressor(codec);
- this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
- } else {
- this.compressor = null;
- this.compressedOutBuffer = null;
- }
- }
-
- public BytesInput compress(BytesInput bytes) throws IOException {
- final BytesInput compressedBytes;
- if (codec == null) {
- compressedBytes = bytes;
- } else {
- compressedOutBuffer.reset();
- if (compressor != null) {
- // null compressor for non-native gzip
- compressor.reset();
- }
- CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
- bytes.writeAllTo(cos);
- cos.finish();
- cos.close();
- compressedBytes = BytesInput.from(compressedOutBuffer);
- }
- return compressedBytes;
- }
-
- private void release() {
- if (compressor != null) {
- CodecPool.returnCompressor(compressor);
- }
- }
-
- public CompressionCodecName getCodecName() {
- return codecName;
- }
-
- }
-
- private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
- private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
- private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
- private final Configuration configuration;
-
- public CodecFactory(Configuration configuration) {
- this.configuration = configuration;
- }
-
- /**
- *
- * @param codecName the requested codec
- * @return the corresponding hadoop codec. null if UNCOMPRESSED
- */
- private CompressionCodec getCodec(CompressionCodecName codecName) {
- String codecClassName = codecName.getHadoopCompressionCodecClassName();
- if (codecClassName == null) {
- return null;
- }
- CompressionCodec codec = codecByName.get(codecClassName);
- if (codec != null) {
- return codec;
- }
-
- try {
- Class<?> codecClass = Class.forName(codecClassName);
- codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
- codecByName.put(codecClassName, codec);
- return codec;
- } catch (ClassNotFoundException e) {
- throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
- }
- }
-
- public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
- BytesCompressor comp = compressors.get(codecName);
- if (comp == null) {
- CompressionCodec codec = getCodec(codecName);
- comp = new BytesCompressor(codecName, codec, pageSize);
- compressors.put(codecName, comp);
- }
- return comp;
- }
-
- public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
- BytesDecompressor decomp = decompressors.get(codecName);
- if (decomp == null) {
- CompressionCodec codec = getCodec(codecName);
- decomp = new BytesDecompressor(codec);
- decompressors.put(codecName, decomp);
- }
- return decomp;
- }
-
- public void release() {
- for (BytesCompressor compressor : compressors.values()) {
- compressor.release();
- }
- compressors.clear();
- for (BytesDecompressor decompressor : decompressors.values()) {
- decompressor.release();
- }
- decompressors.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
deleted file mode 100644
index 30349be..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageReadStore.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import parquet.Ints;
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.DataPage;
-import parquet.column.page.DataPageV1;
-import parquet.column.page.DataPageV2;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageReadStore;
-import parquet.column.page.PageReader;
-import parquet.hadoop.CodecFactory.BytesDecompressor;
-import parquet.io.ParquetDecodingException;
-
-/**
- * TODO: should this actually be called RowGroupImpl or something?
- * The name is kind of confusing since it references three different "entities"
- * in our format: columns, chunks, and pages
- *
- */
-class ColumnChunkPageReadStore implements PageReadStore {
- private static final Log LOG = Log.getLog(ColumnChunkPageReadStore.class);
-
- /**
- * PageReader for a single column chunk. A column chunk contains
- * several pages, which are yielded one by one in order.
- *
- * This implementation is provided with a list of pages, each of which
- * is decompressed and passed through.
- */
- static final class ColumnChunkPageReader implements PageReader {
-
- private final BytesDecompressor decompressor;
- private final long valueCount;
- private final List<DataPage> compressedPages;
- private final DictionaryPage compressedDictionaryPage;
-
- ColumnChunkPageReader(BytesDecompressor decompressor, List<DataPage> compressedPages, DictionaryPage compressedDictionaryPage) {
- this.decompressor = decompressor;
- this.compressedPages = new LinkedList<DataPage>(compressedPages);
- this.compressedDictionaryPage = compressedDictionaryPage;
- int count = 0;
- for (DataPage p : compressedPages) {
- count += p.getValueCount();
- }
- this.valueCount = count;
- }
-
- @Override
- public long getTotalValueCount() {
- return valueCount;
- }
-
- @Override
- public DataPage readPage() {
- if (compressedPages.isEmpty()) {
- return null;
- }
- DataPage compressedPage = compressedPages.remove(0);
- return compressedPage.accept(new DataPage.Visitor<DataPage>() {
- @Override
- public DataPage visit(DataPageV1 dataPageV1) {
- try {
- return new DataPageV1(
- decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize()),
- dataPageV1.getValueCount(),
- dataPageV1.getUncompressedSize(),
- dataPageV1.getStatistics(),
- dataPageV1.getRlEncoding(),
- dataPageV1.getDlEncoding(),
- dataPageV1.getValueEncoding());
- } catch (IOException e) {
- throw new ParquetDecodingException("could not decompress page", e);
- }
- }
-
- @Override
- public DataPage visit(DataPageV2 dataPageV2) {
- if (!dataPageV2.isCompressed()) {
- return dataPageV2;
- }
- try {
- int uncompressedSize = Ints.checkedCast(
- dataPageV2.getUncompressedSize()
- - dataPageV2.getDefinitionLevels().size()
- - dataPageV2.getRepetitionLevels().size());
- return DataPageV2.uncompressed(
- dataPageV2.getRowCount(),
- dataPageV2.getNullCount(),
- dataPageV2.getValueCount(),
- dataPageV2.getRepetitionLevels(),
- dataPageV2.getDefinitionLevels(),
- dataPageV2.getDataEncoding(),
- decompressor.decompress(dataPageV2.getData(), uncompressedSize),
- dataPageV2.getStatistics()
- );
- } catch (IOException e) {
- throw new ParquetDecodingException("could not decompress page", e);
- }
- }
- });
- }
-
- @Override
- public DictionaryPage readDictionaryPage() {
- if (compressedDictionaryPage == null) {
- return null;
- }
- try {
- return new DictionaryPage(
- decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
- compressedDictionaryPage.getDictionarySize(),
- compressedDictionaryPage.getEncoding());
- } catch (IOException e) {
- throw new RuntimeException(e); // TODO: cleanup
- }
- }
- }
-
- private final Map<ColumnDescriptor, ColumnChunkPageReader> readers = new HashMap<ColumnDescriptor, ColumnChunkPageReader>();
- private final long rowCount;
-
- public ColumnChunkPageReadStore(long rowCount) {
- this.rowCount = rowCount;
- }
-
- @Override
- public long getRowCount() {
- return rowCount;
- }
-
- @Override
- public PageReader getPageReader(ColumnDescriptor path) {
- if (!readers.containsKey(path)) {
- throw new IllegalArgumentException(path + " is not in the store: " + readers.keySet() + " " + rowCount);
- }
- return readers.get(path);
- }
-
- void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) {
- if (readers.put(path, reader) != null) {
- throw new RuntimeException(path+ " was added twice");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
deleted file mode 100644
index e3bab0d..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import static parquet.Log.INFO;
-import static parquet.column.statistics.Statistics.getStatsBasedOnType;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import parquet.Log;
-import parquet.bytes.BytesInput;
-import parquet.bytes.ConcatenatingByteArrayCollector;
-import parquet.column.ColumnDescriptor;
-import parquet.column.Encoding;
-import parquet.column.page.DictionaryPage;
-import parquet.column.page.PageWriteStore;
-import parquet.column.page.PageWriter;
-import parquet.column.statistics.Statistics;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.CodecFactory.BytesCompressor;
-import parquet.io.ParquetEncodingException;
-import parquet.schema.MessageType;
-
-class ColumnChunkPageWriteStore implements PageWriteStore {
- private static final Log LOG = Log.getLog(ColumnChunkPageWriteStore.class);
-
- private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
-
- private static final class ColumnChunkPageWriter implements PageWriter {
-
- private final ColumnDescriptor path;
- private final BytesCompressor compressor;
-
- private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream();
- private final ConcatenatingByteArrayCollector buf;
- private DictionaryPage dictionaryPage;
-
- private long uncompressedLength;
- private long compressedLength;
- private long totalValueCount;
- private int pageCount;
-
- private Set<Encoding> encodings = new HashSet<Encoding>();
-
- private Statistics totalStatistics;
-
- private ColumnChunkPageWriter(ColumnDescriptor path, BytesCompressor compressor, int pageSize) {
- this.path = path;
- this.compressor = compressor;
- this.buf = new ConcatenatingByteArrayCollector();
- this.totalStatistics = getStatsBasedOnType(this.path.getType());
- }
-
- @Override
- public void writePage(BytesInput bytes,
- int valueCount,
- Statistics statistics,
- Encoding rlEncoding,
- Encoding dlEncoding,
- Encoding valuesEncoding) throws IOException {
- long uncompressedSize = bytes.size();
- if (uncompressedSize > Integer.MAX_VALUE) {
- throw new ParquetEncodingException(
- "Cannot write page larger than Integer.MAX_VALUE bytes: " +
- uncompressedSize);
- }
- BytesInput compressedBytes = compressor.compress(bytes);
- long compressedSize = compressedBytes.size();
- if (compressedSize > Integer.MAX_VALUE) {
- throw new ParquetEncodingException(
- "Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
- + compressedSize);
- }
- tempOutputStream.reset();
- parquetMetadataConverter.writeDataPageHeader(
- (int)uncompressedSize,
- (int)compressedSize,
- valueCount,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- tempOutputStream);
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
- this.totalStatistics.mergeStatistics(statistics);
- // by concatenating before collecting instead of collecting twice,
- // we only allocate one buffer to copy into instead of multiple.
- buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes));
- encodings.add(rlEncoding);
- encodings.add(dlEncoding);
- encodings.add(valuesEncoding);
- }
-
- @Override
- public void writePageV2(
- int rowCount, int nullCount, int valueCount,
- BytesInput repetitionLevels, BytesInput definitionLevels,
- Encoding dataEncoding, BytesInput data,
- Statistics<?> statistics) throws IOException {
- int rlByteLength = toIntWithCheck(repetitionLevels.size());
- int dlByteLength = toIntWithCheck(definitionLevels.size());
- int uncompressedSize = toIntWithCheck(
- data.size() + repetitionLevels.size() + definitionLevels.size()
- );
- // TODO: decide if we compress
- BytesInput compressedData = compressor.compress(data);
- int compressedSize = toIntWithCheck(
- compressedData.size() + repetitionLevels.size() + definitionLevels.size()
- );
- tempOutputStream.reset();
- parquetMetadataConverter.writeDataPageV2Header(
- uncompressedSize, compressedSize,
- valueCount, nullCount, rowCount,
- statistics,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- tempOutputStream);
- this.uncompressedLength += uncompressedSize;
- this.compressedLength += compressedSize;
- this.totalValueCount += valueCount;
- this.pageCount += 1;
- this.totalStatistics.mergeStatistics(statistics);
-
- // by concatenating before collecting instead of collecting twice,
- // we only allocate one buffer to copy into instead of multiple.
- buf.collect(
- BytesInput.concat(
- BytesInput.from(tempOutputStream),
- repetitionLevels,
- definitionLevels,
- compressedData)
- );
- encodings.add(dataEncoding);
- }
-
- private int toIntWithCheck(long size) {
- if (size > Integer.MAX_VALUE) {
- throw new ParquetEncodingException(
- "Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
- size);
- }
- return (int)size;
- }
-
- @Override
- public long getMemSize() {
- return buf.size();
- }
-
- public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
- writer.startColumn(path, totalValueCount, compressor.getCodecName());
- if (dictionaryPage != null) {
- writer.writeDictionaryPage(dictionaryPage);
- encodings.add(dictionaryPage.getEncoding());
- }
- writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList<Encoding>(encodings));
- writer.endColumn();
- if (INFO) {
- LOG.info(
- String.format(
- "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
- buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings)
- + (dictionaryPage != null ? String.format(
- ", dic { %,d entries, %,dB raw, %,dB comp}",
- dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
- : ""));
- }
- encodings.clear();
- pageCount = 0;
- }
-
- @Override
- public long allocatedSize() {
- return buf.size();
- }
-
- @Override
- public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- if (this.dictionaryPage != null) {
- throw new ParquetEncodingException("Only one dictionary page is allowed");
- }
- BytesInput dictionaryBytes = dictionaryPage.getBytes();
- int uncompressedSize = (int)dictionaryBytes.size();
- BytesInput compressedBytes = compressor.compress(dictionaryBytes);
- this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize, dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
- }
-
- @Override
- public String memUsageString(String prefix) {
- return buf.memUsageString(prefix + " ColumnChunkPageWriter");
- }
- }
-
- private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
- private final MessageType schema;
-
- public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, int pageSize) {
- this.schema = schema;
- for (ColumnDescriptor path : schema.getColumns()) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, pageSize));
- }
- }
-
- @Override
- public PageWriter getPageWriter(ColumnDescriptor path) {
- return writers.get(path);
- }
-
- public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
- for (ColumnDescriptor path : schema.getColumns()) {
- ColumnChunkPageWriter pageWriter = writers.get(path);
- pageWriter.writeToFileWriter(writer);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java b/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java
deleted file mode 100644
index 980a120..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/Footer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-
-import org.apache.hadoop.fs.Path;
-
-import parquet.hadoop.metadata.ParquetMetadata;
-
-/**
- *
- * Represent the footer for a given file
- *
- * @author Julien Le Dem
- *
- */
-public class Footer {
-
- private final Path file;
-
- private final ParquetMetadata parquetMetadata;
-
- public Footer(Path file, ParquetMetadata parquetMetadata) {
- super();
- this.file = file;
- this.parquetMetadata = parquetMetadata;
- }
-
- public Path getFile() {
- return file;
- }
-
- public ParquetMetadata getParquetMetadata() {
- return parquetMetadata;
- }
-
- @Override
- public String toString() {
- return "Footer{"+file+", "+parquetMetadata+"}";
- }
-}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
deleted file mode 100644
index 8ae7c57..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.hadoop;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import parquet.Log;
-import parquet.column.ColumnDescriptor;
-import parquet.column.page.PageReadStore;
-import parquet.filter.UnboundRecordFilter;
-import parquet.filter2.compat.FilterCompat;
-import parquet.filter2.compat.FilterCompat.Filter;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.util.counters.BenchmarkCounter;
-import parquet.io.ColumnIOFactory;
-import parquet.io.MessageColumnIO;
-import parquet.io.ParquetDecodingException;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import static java.lang.String.format;
-import static parquet.Log.DEBUG;
-import static parquet.Preconditions.checkNotNull;
-import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
-
-class InternalParquetRecordReader<T> {
- private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
-
- private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
- private final Filter filter;
-
- private MessageType requestedSchema;
- private MessageType fileSchema;
- private int columnCount;
- private final ReadSupport<T> readSupport;
-
- private RecordMaterializer<T> recordConverter;
-
- private T currentValue;
- private long total;
- private long current = 0;
- private int currentBlock = -1;
- private ParquetFileReader reader;
- private parquet.io.RecordReader<T> recordReader;
- private boolean strictTypeChecking;
-
- private long totalTimeSpentReadingBytes;
- private long totalTimeSpentProcessingRecords;
- private long startedAssemblingCurrentBlockAt;
-
- private long totalCountLoadedSoFar = 0;
-
- private Path file;
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- * @param filter for filtering individual records
- */
- public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
- this.readSupport = readSupport;
- this.filter = checkNotNull(filter, "filter");
- }
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- */
- public InternalParquetRecordReader(ReadSupport<T> readSupport) {
- this(readSupport, FilterCompat.NOOP);
- }
-
- /**
- * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
- * @param filter Optional filter for only returning matching records.
- * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)}
- */
- @Deprecated
- public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
- this(readSupport, FilterCompat.get(filter));
- }
-
- private void checkRead() throws IOException {
- if (current == totalCountLoadedSoFar) {
- if (current != 0) {
- totalTimeSpentProcessingRecords += (System.currentTimeMillis() - startedAssemblingCurrentBlockAt);
- if (Log.INFO) {
- LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
- final long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
- if (totalTime != 0) {
- final long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
- final long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
- LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
- }
- }
- }
-
- LOG.info("at row " + current + ". reading next block");
- long t0 = System.currentTimeMillis();
- PageReadStore pages = reader.readNextRowGroup();
- if (pages == null) {
- throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
- }
- long timeSpentReading = System.currentTimeMillis() - t0;
- totalTimeSpentReadingBytes += timeSpentReading;
- BenchmarkCounter.incrementTime(timeSpentReading);
- if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
- if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
- MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
- recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
- startedAssemblingCurrentBlockAt = System.currentTimeMillis();
- totalCountLoadedSoFar += pages.getRowCount();
- ++ currentBlock;
- }
- }
-
- public void close() throws IOException {
- if (reader != null) {
- reader.close();
- }
- }
-
- public Void getCurrentKey() throws IOException, InterruptedException {
- return null;
- }
-
- public T getCurrentValue() throws IOException,
- InterruptedException {
- return currentValue;
- }
-
- public float getProgress() throws IOException, InterruptedException {
- return (float) current / total;
- }
-
- public void initialize(MessageType fileSchema,
- Map<String, String> fileMetadata,
- Path file, List<BlockMetaData> blocks, Configuration configuration)
- throws IOException {
- // initialize a ReadContext for this file
- ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
- configuration, toSetMultiMap(fileMetadata), fileSchema));
- this.requestedSchema = readContext.getRequestedSchema();
- this.fileSchema = fileSchema;
- this.file = file;
- this.columnCount = requestedSchema.getPaths().size();
- this.recordConverter = readSupport.prepareForRead(
- configuration, fileMetadata, fileSchema, readContext);
- this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
- List<ColumnDescriptor> columns = requestedSchema.getColumns();
- reader = new ParquetFileReader(configuration, file, blocks, columns);
- for (BlockMetaData block : blocks) {
- total += block.getRowCount();
- }
- LOG.info("RecordReader initialized will read a total of " + total + " records.");
- }
-
- private boolean contains(GroupType group, String[] path, int index) {
- if (index == path.length) {
- return false;
- }
- if (group.containsField(path[index])) {
- Type type = group.getType(path[index]);
- if (type.isPrimitive()) {
- return index + 1 == path.length;
- } else {
- return contains(type.asGroupType(), path, index + 1);
- }
- }
- return false;
- }
-
- public boolean nextKeyValue() throws IOException, InterruptedException {
- boolean recordFound = false;
-
- while (!recordFound) {
- // no more records left
- if (current >= total) { return false; }
-
- try {
- checkRead();
- currentValue = recordReader.read();
- current ++;
- if (recordReader.shouldSkipCurrentRecord()) {
- // this record is being filtered via the filter2 package
- if (DEBUG) LOG.debug("skipping record");
- continue;
- }
-
- if (currentValue == null) {
- // only happens with FilteredRecordReader at end of block
- current = totalCountLoadedSoFar;
- if (DEBUG) LOG.debug("filtered record reader reached end of block");
- continue;
- }
-
- recordFound = true;
-
- if (DEBUG) LOG.debug("read value: " + currentValue);
- } catch (RuntimeException e) {
- throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
- }
- }
- return true;
- }
-
- private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
- Map<K, Set<V>> setMultiMap = new HashMap<K, Set<V>>();
- for (Map.Entry<K, V> entry : map.entrySet()) {
- Set<V> set = new HashSet<V>();
- set.add(entry.getValue());
- setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
- }
- return Collections.unmodifiableMap(setMultiMap);
- }
-
-}
|