Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5BDC0190E8 for ; Wed, 9 Mar 2016 21:20:48 +0000 (UTC) Received: (qmail 87514 invoked by uid 500); 9 Mar 2016 21:20:48 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 87482 invoked by uid 500); 9 Mar 2016 21:20:48 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 87473 invoked by uid 99); 9 Mar 2016 21:20:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 09 Mar 2016 21:20:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E135FDFA6C; Wed, 9 Mar 2016 21:20:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Message-Id: <87d3422368e242118f3e08554379c74f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-mr git commit: PARQUET-384: Add dictionary filtering. Date: Wed, 9 Mar 2016 21:20:47 +0000 (UTC) Repository: parquet-mr Updated Branches: refs/heads/master 1f91c79de -> 4b1ff8f4b PARQUET-384: Add dictionary filtering. This builds on #286 from @danielcweeks and cleans up some of the interfaces. It introduces `DictionaryPageReadStore` to expose dictionary pages to the filters and cleans up some internal calls by passing `ParquetFileReader`. When committed, this closes #286. Author: Ryan Blue Author: Daniel Weeks Closes #330 from rdblue/PARQUET-384-add-dictionary-filtering and squashes the following commits: ff89424 [Ryan Blue] PARQUET-384: Add a cache to DictionaryPageReader. 1f6861c [Ryan Blue] PARQUET-384: Use ParquetFileReader to initialize readers. 21ef4b6 [Daniel Weeks] PARQUET-384: Add dictionary row group filter. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/4b1ff8f4 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/4b1ff8f4 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/4b1ff8f4 Branch: refs/heads/master Commit: 4b1ff8f4b9dfa0ccb064ef286cf2953bfb2c492d Parents: 1f91c79 Author: Ryan Blue Authored: Wed Mar 9 13:20:37 2016 -0800 Committer: Ryan Blue Committed: Wed Mar 9 13:20:37 2016 -0800 ---------------------------------------------------------------------- .../column/page/DictionaryPageReadStore.java | 36 ++ .../parquet/filter2/compat/RowGroupFilter.java | 37 +- .../dictionarylevel/DictionaryFilter.java | 356 +++++++++++++++++ .../converter/ParquetMetadataConverter.java | 47 ++- .../hadoop/ColumnChunkPageReadStore.java | 10 +- .../parquet/hadoop/DictionaryPageReader.java | 110 ++++++ .../hadoop/InternalParquetRecordReader.java | 39 +- .../parquet/hadoop/ParquetFileReader.java | 320 ++++++++++++--- .../apache/parquet/hadoop/ParquetReader.java | 17 +- .../parquet/hadoop/ParquetRecordReader.java | 71 ++-- .../dictionarylevel/DictionaryFilterTest.java | 387 +++++++++++++++++++ 11 files changed, 1286 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java new file mode 100644 index 0000000..a0cd5f5 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.page; + +import org.apache.parquet.column.ColumnDescriptor; + +/** + * Interface to read dictionary pages for all the columns of a row group + */ +public interface DictionaryPageReadStore { + + /** + * Returns a {@link DictionaryPage} for the given column descriptor. + * The dictionary page bytes are uncompressed. + * + * @param descriptor the descriptor of the column + * @return the DictionaryPage for that column, or null if there isn't one + */ + DictionaryPage readDictionaryPage(ColumnDescriptor descriptor); +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java index d85a231..fd74799 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java @@ -19,14 +19,17 @@ package org.apache.parquet.filter2.compat; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter; import org.apache.parquet.filter2.compat.FilterCompat.Visitor; +import org.apache.parquet.filter2.dictionarylevel.DictionaryFilter; import org.apache.parquet.filter2.predicate.FilterPredicate; import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator; import org.apache.parquet.filter2.statisticslevel.StatisticsFilter; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; @@ -40,15 +43,37 @@ import static org.apache.parquet.Preconditions.checkNotNull; public class RowGroupFilter implements Visitor> { private final List blocks; private final MessageType schema; + private final List levels; + private final ParquetFileReader reader; + + public enum FilterLevel { + STATISTICS, + DICTIONARY + } public static List filterRowGroups(Filter filter, List blocks, MessageType schema) { checkNotNull(filter, "filter"); return filter.accept(new RowGroupFilter(blocks, schema)); } + public static List filterRowGroups(List levels, Filter filter, List blocks, ParquetFileReader reader) { + checkNotNull(filter, "filter"); + return filter.accept(new RowGroupFilter(levels, blocks, reader)); + } + + @Deprecated private RowGroupFilter(List blocks, MessageType schema) { this.blocks = checkNotNull(blocks, "blocks"); this.schema = checkNotNull(schema, "schema"); + this.levels = Collections.singletonList(FilterLevel.STATISTICS); + this.reader = null; + } + + private RowGroupFilter(List levels, List blocks, ParquetFileReader reader) { + this.blocks = checkNotNull(blocks, "blocks"); + this.reader = checkNotNull(reader, "reader"); + this.schema = reader.getFileMetaData().getSchema(); + this.levels = levels; } @Override @@ -61,7 +86,17 @@ public class RowGroupFilter implements Visitor> { List filteredBlocks = new ArrayList(); for (BlockMetaData block : blocks) { - if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) { + boolean drop = false; + + if(levels.contains(FilterLevel.STATISTICS)) { + drop = StatisticsFilter.canDrop(filterPredicate, block.getColumns()); + } + + if(!drop && levels.contains(FilterLevel.DICTIONARY)) { + drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block)); + } + + if(!drop) { filteredBlocks.add(block); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java new file mode 100644 index 0000000..6235c20 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.dictionarylevel; + +import org.apache.parquet.Log; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.DictionaryPageReadStore; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.*; +import org.apache.parquet.filter2.predicate.UserDefinedPredicate; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.Preconditions.checkArgument; +import static org.apache.parquet.Preconditions.checkNotNull; + + +/** + * Applies filters based on the contents of column dictionaries. + */ +public class DictionaryFilter implements FilterPredicate.Visitor { + + private static final Log LOG = Log.getLog(DictionaryFilter.class); + private static final boolean BLOCK_MIGHT_MATCH = false; + private static final boolean BLOCK_CANNOT_MATCH = true; + + public static boolean canDrop(FilterPredicate pred, List columns, DictionaryPageReadStore dictionaries) { + checkNotNull(pred, "pred"); + checkNotNull(columns, "columns"); + return pred.accept(new DictionaryFilter(columns, dictionaries)); + } + + private final Map columns = new HashMap(); + private final DictionaryPageReadStore dictionaries; + + private DictionaryFilter(List columnsList, DictionaryPageReadStore dictionaries) { + for (ColumnChunkMetaData chunk : columnsList) { + columns.put(chunk.getPath(), chunk); + } + + this.dictionaries = dictionaries; + } + + private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) { + ColumnChunkMetaData c = columns.get(columnPath); + checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!"); + return c; + } + + @SuppressWarnings("unchecked") + private > Set expandDictionary(ColumnChunkMetaData meta) throws IOException { + ColumnDescriptor col = new ColumnDescriptor(meta.getPath().toArray(), meta.getType(), -1, -1); + DictionaryPage page = dictionaries.readDictionaryPage(col); + + // the chunk may not be dictionary-encoded + if (page == null) { + return null; + } + + Dictionary dict = page.getEncoding().initDictionary(col, page); + + Set dictSet = new HashSet(); + + for (int i=0; i<=dict.getMaxId(); i++) { + switch(meta.getType()) { + case BINARY: dictSet.add(dict.decodeToBinary(i)); + break; + case INT32: dictSet.add(dict.decodeToInt(i)); + break; + case INT64: dictSet.add(dict.decodeToLong(i)); + break; + case FLOAT: dictSet.add(dict.decodeToFloat(i)); + break; + case DOUBLE: dictSet.add(dict.decodeToDouble(i)); + break; + default: + LOG.warn("Unknown dictionary type" + meta.getType()); + } + } + + return (Set) dictSet; + } + + @Override + public > Boolean visit(Eq eq) { + Column filterColumn = eq.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + + // if the chunk has non-dictionary pages, don't bother decoding the + // dictionary because the row group can't be eliminated. + if (hasNonDictionaryPages(meta)) { + return BLOCK_MIGHT_MATCH; + } + + T value = eq.getValue(); + + filterColumn.getColumnPath(); + + try { + Set dictSet = expandDictionary(meta); + if (dictSet != null && !dictSet.contains(value)) { + return BLOCK_CANNOT_MATCH; + } + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; // cannot drop the row group based on this dictionary + } + + @Override + public > Boolean visit(NotEq notEq) { + Column filterColumn = notEq.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + + // if the chunk has non-dictionary pages, don't bother decoding the + // dictionary because the row group can't be eliminated. + if (hasNonDictionaryPages(meta)) { + return BLOCK_MIGHT_MATCH; + } + + T value = notEq.getValue(); + + filterColumn.getColumnPath(); + + try { + Set dictSet = expandDictionary(meta); + if (dictSet != null && dictSet.size() == 1 && dictSet.contains(value)) { + return BLOCK_CANNOT_MATCH; + } + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Lt lt) { + Column filterColumn = lt.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + + // if the chunk has non-dictionary pages, don't bother decoding the + // dictionary because the row group can't be eliminated. + if (hasNonDictionaryPages(meta)) { + return BLOCK_MIGHT_MATCH; + } + + T value = lt.getValue(); + + filterColumn.getColumnPath(); + + try { + Set dictSet = expandDictionary(meta); + if (dictSet == null) { + return BLOCK_MIGHT_MATCH; + } + + for(T entry : dictSet) { + if(value.compareTo(entry) > 0) { + return BLOCK_MIGHT_MATCH; + } + } + + return BLOCK_CANNOT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(LtEq ltEq) { + Column filterColumn = ltEq.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + + // if the chunk has non-dictionary pages, don't bother decoding the + // dictionary because the row group can't be eliminated. + if (hasNonDictionaryPages(meta)) { + return BLOCK_MIGHT_MATCH; + } + + T value = ltEq.getValue(); + + filterColumn.getColumnPath(); + + try { + Set dictSet = expandDictionary(meta); + if (dictSet == null) { + return BLOCK_MIGHT_MATCH; + } + + for(T entry : dictSet) { + if(value.compareTo(entry) >= 0) { + return BLOCK_MIGHT_MATCH; + } + } + + return BLOCK_CANNOT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(Gt gt) { + Column filterColumn = gt.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + + // if the chunk has non-dictionary pages, don't bother decoding the + // dictionary because the row group can't be eliminated. + if (hasNonDictionaryPages(meta)) { + return BLOCK_MIGHT_MATCH; + } + + T value = gt.getValue(); + + filterColumn.getColumnPath(); + + try { + Set dictSet = expandDictionary(meta); + if (dictSet == null) { + return BLOCK_MIGHT_MATCH; + } + + for(T entry : dictSet) { + if(value.compareTo(entry) < 0) { + return BLOCK_MIGHT_MATCH; + } + } + + return BLOCK_CANNOT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + + @Override + public > Boolean visit(GtEq gtEq) { + Column filterColumn = gtEq.getColumn(); + ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath()); + + // if the chunk has non-dictionary pages, don't bother decoding the + // dictionary because the row group can't be eliminated. + if (hasNonDictionaryPages(meta)) { + return BLOCK_MIGHT_MATCH; + } + + T value = gtEq.getValue(); + + filterColumn.getColumnPath(); + + try { + Set dictSet = expandDictionary(meta); + if (dictSet == null) { + return BLOCK_MIGHT_MATCH; + } + + for(T entry : dictSet) { + if(value.compareTo(entry) <= 0) { + return BLOCK_MIGHT_MATCH; + } + } + + return BLOCK_CANNOT_MATCH; + } catch (IOException e) { + LOG.warn("Failed to process dictionary for filter evaluation.", e); + } + + return BLOCK_MIGHT_MATCH; + } + + @Override + public Boolean visit(And and) { + return and.getLeft().accept(this) || and.getRight().accept(this); + } + + @Override + public Boolean visit(Or or) { + return or.getLeft().accept(this) && or.getRight().accept(this); + } + + @Override + public Boolean visit(Not not) { + throw new IllegalArgumentException( + "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not); + } + + @Override + public , U extends UserDefinedPredicate> Boolean visit(UserDefined udp) { + throw new UnsupportedOperationException("UDP not supported with dictionary evaluation."); + } + + @Override + public , U extends UserDefinedPredicate> Boolean visit(LogicalNotUserDefined udp) { + throw new UnsupportedOperationException("UDP not supported with dictionary evaluation."); + } + + @SuppressWarnings("deprecation") + private static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) { + // without EncodingStats, fall back to testing the encoding list + Set encodings = new HashSet(meta.getEncodings()); + if (encodings.remove(Encoding.PLAIN_DICTIONARY)) { + // if remove returned true, PLAIN_DICTIONARY was present, which means at + // least one page was dictionary encoded and 1.0 encodings are used + + // RLE and BIT_PACKED are only used for repetition or definition levels + encodings.remove(Encoding.RLE); + encodings.remove(Encoding.BIT_PACKED); + + if (encodings.isEmpty()) { + return false; // no encodings other than dictionary or rep/def levels + } + + return true; + + } else { + // if PLAIN_DICTIONARY wasn't present, then either the column is not + // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used. + // for 2.0, this cannot determine whether a page fell back without + // page encoding stats + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index b373bfb..48f295e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -429,13 +429,14 @@ public class ParquetMetadataConverter { private static interface MetadataFilterVisitor { T visit(NoFilter filter) throws E; T visit(SkipMetadataFilter filter) throws E; - T visit(RangeMetadataFilter filter) throws E; + T visit(OffsetMetadataFilter filter) throws E; } public abstract static class MetadataFilter { private MetadataFilter() {} abstract T accept(MetadataFilterVisitor visitor) throws E; } + /** * [ startOffset, endOffset ) * @param startOffset @@ -445,6 +446,15 @@ public class ParquetMetadataConverter { public static MetadataFilter range(long startOffset, long endOffset) { return new RangeMetadataFilter(startOffset, endOffset); } + + public static MetadataFilter offsets(long... offsets) { + Set set = new HashSet(); + for (long offset : offsets) { + set.add(offset); + } + return new OffsetListMetadataFilter(set); + } + private static final class NoFilter extends MetadataFilter { private NoFilter() {} @Override @@ -467,39 +477,66 @@ public class ParquetMetadataConverter { return "SKIP_ROW_GROUPS"; } } + + interface OffsetMetadataFilter { + boolean contains(long offset); + } + /** * [ startOffset, endOffset ) * @author Julien Le Dem */ // Visible for testing - static final class RangeMetadataFilter extends MetadataFilter { + static final class RangeMetadataFilter extends MetadataFilter implements OffsetMetadataFilter { final long startOffset; final long endOffset; + RangeMetadataFilter(long startOffset, long endOffset) { super(); this.startOffset = startOffset; this.endOffset = endOffset; } + @Override T accept(MetadataFilterVisitor visitor) throws E { return visitor.visit(this); } - boolean contains(long offset) { + + @Override + public boolean contains(long offset) { return offset >= this.startOffset && offset < this.endOffset; } + @Override public String toString() { return "range(s:" + startOffset + ", e:" + endOffset + ")"; } } + static final class OffsetListMetadataFilter extends MetadataFilter implements OffsetMetadataFilter { + private final Set offsets; + + public OffsetListMetadataFilter(Set offsets) { + this.offsets = offsets; + } + + public boolean contains(long offset) { + return offsets.contains(offset); + } + + @Override + T accept(MetadataFilterVisitor visitor) throws E { + return visitor.visit(this); + } + } + @Deprecated public ParquetMetadata readParquetMetadata(InputStream from) throws IOException { return readParquetMetadata(from, NO_FILTER); } // Visible for testing - static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) { + static FileMetaData filterFileMetaData(FileMetaData metaData, OffsetMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); for (RowGroup rowGroup : rowGroups) { @@ -544,7 +581,7 @@ public class ParquetMetadataConverter { } @Override - public FileMetaData visit(RangeMetadataFilter filter) throws IOException { + public FileMetaData visit(OffsetMetadataFilter filter) throws IOException { return filterFileMetaData(readFileMetaData(from), filter); } }); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index af06747..b0d0d30 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -31,6 +31,7 @@ import org.apache.parquet.column.page.DataPage; import org.apache.parquet.column.page.DataPageV1; import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor; @@ -42,7 +43,7 @@ import org.apache.parquet.io.ParquetDecodingException; * in our format: columns, chunks, and pages * */ -class ColumnChunkPageReadStore implements PageReadStore { +class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore { private static final Log LOG = Log.getLog(ColumnChunkPageReadStore.class); /** @@ -136,7 +137,7 @@ class ColumnChunkPageReadStore implements PageReadStore { compressedDictionaryPage.getDictionarySize(), compressedDictionaryPage.getEncoding()); } catch (IOException e) { - throw new RuntimeException(e); // TODO: cleanup + throw new ParquetDecodingException("Could not decompress dictionary page", e); } } } @@ -161,6 +162,11 @@ class ColumnChunkPageReadStore implements PageReadStore { return readers.get(path); } + @Override + public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) { + return readers.get(descriptor).readDictionaryPage(); + } + void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) { if (readers.put(path, reader) != null) { throw new RuntimeException(path+ " was added twice"); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java new file mode 100644 index 0000000..cb0d5e7 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.parquet.Strings; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.DictionaryPageReadStore; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.io.ParquetDecodingException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; +import static org.apache.parquet.column.Encoding.RLE_DICTIONARY; + +/** + * A {@link DictionaryPageReadStore} implementation that reads dictionaries from + * an open {@link ParquetFileReader}. + * + * This implementation will delegate dictionary reads to a + * {@link ColumnChunkPageReadStore} to avoid extra reads after a row group has + * been loaded into memory. + */ +class DictionaryPageReader implements DictionaryPageReadStore { + + private final ParquetFileReader reader; + private final Map columns; + private final Map cache = new HashMap(); + private ColumnChunkPageReadStore rowGroup = null; + + DictionaryPageReader(ParquetFileReader reader, BlockMetaData block) { + this.reader = reader; + this.columns = new HashMap(); + for (ColumnChunkMetaData column : block.getColumns()) { + columns.put(column.getPath().toDotString(), column); + } + } + + /** + * Sets this reader's row group's page store. When a row group is set, this + * reader will delegate to that row group to return dictionary pages. This + * avoids seeking and re-reading dictionary bytes after this reader's row + * group is loaded into memory. + * + * @param rowGroup a ColumnChunkPageReadStore for this reader's row group + */ + void setRowGroup(ColumnChunkPageReadStore rowGroup) { + this.rowGroup = rowGroup; + } + + @Override + public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) { + if (rowGroup != null) { + // if the row group has already been read, use that dictionary + return rowGroup.readDictionaryPage(descriptor); + } + + String dotPath = Strings.join(descriptor.getPath(), "."); + ColumnChunkMetaData column = columns.get(dotPath); + if (column == null) { + throw new ParquetDecodingException( + "Cannot load dictionary, unknown column: " + dotPath); + } + + if (cache.containsKey(dotPath)) { + return cache.get(dotPath); + } + + try { + synchronized (cache) { + // check the cache again in case this thread waited on another reading the same page + if (!cache.containsKey(dotPath)) { + DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null; + cache.put(dotPath, dict); + } + } + + return cache.get(dotPath); + } catch (IOException e) { + throw new ParquetDecodingException( + "Failed to read dictionary", e); + } + } + + private boolean hasDictionaryPage(ColumnChunkMetaData column) { + Set encodings = column.getEncodings(); + return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY)); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index c1bd037..f74e57c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -22,22 +22,18 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.Log; -import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; import org.apache.parquet.hadoop.api.InitContext; import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ColumnIOFactory; @@ -45,9 +41,7 @@ import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; -import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; import static java.lang.String.format; import static org.apache.parquet.Log.DEBUG; @@ -81,7 +75,6 @@ class InternalParquetRecordReader { private long totalCountLoadedSoFar = 0; - private Path file; private UnmaterializableRecordCounter unmaterializableRecordCounter; /** @@ -163,46 +156,26 @@ class InternalParquetRecordReader { return (float) current / total; } - public void initialize(MessageType fileSchema, - FileMetaData parquetFileMetadata, - Path file, List blocks, Configuration configuration) + public void initialize(ParquetFileReader reader, Configuration configuration) throws IOException { // initialize a ReadContext for this file + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + this.fileSchema = parquetFileMetadata.getSchema(); Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); ReadSupport.ReadContext readContext = readSupport.init(new InitContext( configuration, toSetMultiMap(fileMetadata), fileSchema)); this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); this.requestedSchema = readContext.getRequestedSchema(); - this.fileSchema = fileSchema; - this.file = file; this.columnCount = requestedSchema.getPaths().size(); this.recordConverter = readSupport.prepareForRead( configuration, fileMetadata, fileSchema, readContext); this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true); - List columns = requestedSchema.getColumns(); - reader = new ParquetFileReader(configuration, parquetFileMetadata, file, blocks, columns); - for (BlockMetaData block : blocks) { - total += block.getRowCount(); - } + this.total = reader.getRecordCount(); this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total); LOG.info("RecordReader initialized will read a total of " + total + " records."); } - private boolean contains(GroupType group, String[] path, int index) { - if (index == path.length) { - return false; - } - if (group.containsField(path[index])) { - Type type = group.getType(path[index]); - if (type.isPrimitive()) { - return index + 1 == path.length; - } else { - return contains(type.asGroupType(), path, index + 1); - } - } - return false; - } - public boolean nextKeyValue() throws IOException, InterruptedException { boolean recordFound = false; @@ -240,7 +213,7 @@ class InternalParquetRecordReader { if (DEBUG) LOG.debug("read value: " + currentValue); } catch (RuntimeException e) { - throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e); + throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getPath()), e); } } return true; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 55ed5ee..3d7b499 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -20,6 +20,8 @@ package org.apache.parquet.hadoop; import static org.apache.parquet.Log.DEBUG; import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian; +import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY; +import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS; import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS; import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; @@ -56,6 +58,10 @@ import org.apache.hadoop.fs.Path; import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.HeapByteBufferAllocator; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.hadoop.util.CompatibilityUtil; import org.apache.parquet.Log; @@ -422,58 +428,74 @@ public class ParquetFileReader implements Closeable { */ public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException { FileSystem fileSystem = file.getPath().getFileSystem(configuration); - FSDataInputStream f = fileSystem.open(file.getPath()); + FSDataInputStream in = fileSystem.open(file.getPath()); try { - long l = file.getLen(); - if (Log.DEBUG) { - LOG.debug("File length " + l); - } - int FOOTER_LENGTH_SIZE = 4; - if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC - throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)"); - } - long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length; - if (Log.DEBUG) { - LOG.debug("reading footer index at " + footerLengthIndex); - } - - f.seek(footerLengthIndex); - int footerLength = readIntLittleEndian(f); - byte[] magic = new byte[MAGIC.length]; - f.readFully(magic); - if (!Arrays.equals(MAGIC, magic)) { - throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic)); - } - long footerIndex = footerLengthIndex - footerLength; - if (Log.DEBUG) { - LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex); - } - if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) { - throw new RuntimeException("corrupted file: the footer index is not within the file"); - } - f.seek(footerIndex); - return converter.readParquetMetadata(f, filter); + return readFooter(file, in, filter); } finally { - f.close(); + in.close(); } } - static ParquetFileReader open(Configuration conf, Path file) throws IOException { - ParquetMetadata footer = readFooter(conf, file, NO_FILTER); - return new ParquetFileReader(conf, footer.getFileMetaData(), file, - footer.getBlocks(), footer.getFileMetaData().getSchema().getColumns()); + private static final ParquetMetadata readFooter(FileStatus file, FSDataInputStream f, MetadataFilter filter) throws IOException { + long l = file.getLen(); + if (Log.DEBUG) { + LOG.debug("File length " + l); + } + int FOOTER_LENGTH_SIZE = 4; + if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC + throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)"); + } + long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length; + if (Log.DEBUG) { + LOG.debug("reading footer index at " + footerLengthIndex); + } + + f.seek(footerLengthIndex); + int footerLength = readIntLittleEndian(f); + byte[] magic = new byte[MAGIC.length]; + f.readFully(magic); + if (!Arrays.equals(MAGIC, magic)) { + throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic)); + } + long footerIndex = footerLengthIndex - footerLength; + if (Log.DEBUG) { + LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex); + } + if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) { + throw new RuntimeException("corrupted file: the footer index is not within the file"); + } + f.seek(footerIndex); + return converter.readParquetMetadata(f, filter); + } + + public static ParquetFileReader open(Configuration conf, Path file) throws IOException { + return new ParquetFileReader(conf, file); + } + + public static ParquetFileReader open(Configuration conf, Path file, MetadataFilter filter) throws IOException { + return new ParquetFileReader(conf, file, filter); + } + + public static ParquetFileReader open(Configuration conf, Path file, ParquetMetadata footer) throws IOException { + return new ParquetFileReader(conf, file, footer); } private final CodecFactory codecFactory; - private final List blocks; private final FSDataInputStream f; - private final Path filePath; + private final FileStatus fileStatus; private final Map paths = new HashMap(); - private final FileMetaData fileMetaData; - private final String createdBy; + private final FileMetaData fileMetaData; // may be null private final ByteBufferAllocator allocator; + private final Configuration conf; + + // not final. in some cases, this may be lazily loaded for backward-compat. + private ParquetMetadata footer; + // blocks can be filtered after they are read (or set in the constructor) + private List blocks; private int currentBlock = 0; + private ColumnChunkPageReadStore currentRowGroup = null; + private DictionaryPageReader nextDictionaryReader = null; /** * @deprecated use @link{ParquetFileReader(Configuration configuration, FileMetaData fileMetaData, @@ -490,14 +512,15 @@ public class ParquetFileReader implements Closeable { * @param columns the columns to read (their path) * @throws IOException if the file can not be opened */ + @Deprecated public ParquetFileReader( Configuration configuration, FileMetaData fileMetaData, Path filePath, List blocks, List columns) throws IOException { - this.filePath = filePath; + this.conf = configuration; this.fileMetaData = fileMetaData; - this.createdBy = fileMetaData == null ? null : fileMetaData.getCreatedBy(); FileSystem fs = filePath.getFileSystem(configuration); this.f = fs.open(filePath); + this.fileStatus = fs.getFileStatus(filePath); this.blocks = blocks; for (ColumnDescriptor col : columns) { paths.put(ColumnPath.get(col.getPath()), col); @@ -508,6 +531,111 @@ public class ParquetFileReader implements Closeable { this.allocator = new HeapByteBufferAllocator(); } + /** + * @param configuration the Hadoop Configuration + * @param file Path to a parquet file + * @throws IOException if the file can not be opened + */ + private ParquetFileReader(Configuration configuration, Path file) throws IOException { + this(configuration, file, NO_FILTER); + } + + /** + * @param conf the Hadoop Configuration + * @param file Path to a parquet file + * @param filter a {@link MetadataFilter} for selecting row groups + * @throws IOException if the file can not be opened + */ + public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException { + this.conf = conf; + FileSystem fs = file.getFileSystem(conf); + this.fileStatus = fs.getFileStatus(file); + this.f = fs.open(file); + this.footer = readFooter(fileStatus, f, filter); + this.fileMetaData = footer.getFileMetaData(); + this.blocks = footer.getBlocks(); + for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + // the page size parameter isn't meaningful when only using + // the codec factory to get decompressors + this.codecFactory = new CodecFactory(conf, 0); + this.allocator = new HeapByteBufferAllocator(); + } + + /** + * @param conf the Hadoop Configuration + * @param file Path to a parquet file + * @param footer a {@link ParquetMetadata} footer already read from the file + * @throws IOException if the file can not be opened + */ + public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) throws IOException { + this.conf = conf; + FileSystem fs = file.getFileSystem(conf); + this.fileStatus = fs.getFileStatus(file); + this.f = fs.open(file); + this.footer = footer; + this.fileMetaData = footer.getFileMetaData(); + this.blocks = footer.getBlocks(); + for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) { + paths.put(ColumnPath.get(col.getPath()), col); + } + // the page size parameter isn't meaningful when only using + // the codec factory to get decompressors + this.codecFactory = new CodecFactory(conf, 0); + this.allocator = new HeapByteBufferAllocator(); + } + + public ParquetMetadata getFooter() { + if (footer == null) { + try { + // don't read the row groups because this.blocks is always set + this.footer = readFooter(fileStatus, f, SKIP_ROW_GROUPS); + } catch (IOException e) { + throw new ParquetDecodingException("Unable to read file footer", e); + } + } + return footer; + } + + public FileMetaData getFileMetaData() { + if (fileMetaData != null) { + return fileMetaData; + } + return getFooter().getFileMetaData(); + } + + public long getRecordCount() { + long total = 0; + for (BlockMetaData block : blocks) { + total += block.getRowCount(); + } + return total; + } + + public Path getPath() { + return fileStatus.getPath(); + } + + void filterRowGroups(FilterCompat.Filter filter) throws IOException { + // set up data filters based on configured levels + List levels = new ArrayList(); + + if (conf.getBoolean("parquet.filter.statistics.enabled", true)) { + levels.add(STATISTICS); + } + + if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) { + levels.add(DICTIONARY); + } + + this.blocks = RowGroupFilter.filterRowGroups(levels, filter, blocks, this); + } + + public List getRowGroups() { + return blocks; + } + public void appendTo(ParquetFileWriter writer) throws IOException { writer.appendRowGroups(f, blocks, true); } @@ -525,7 +653,7 @@ public class ParquetFileReader implements Closeable { if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - ColumnChunkPageReadStore columnChunkPageReadStore = new ColumnChunkPageReadStore(block.getRowCount()); + this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount()); // prepare the list of consecutive chunks to read them in one scan List allChunks = new ArrayList(); ConsecutiveChunkList currentChunks = null; @@ -547,19 +675,113 @@ public class ParquetFileReader implements Closeable { for (ConsecutiveChunkList consecutiveChunks : allChunks) { final List chunks = consecutiveChunks.readAll(f); for (Chunk chunk : chunks) { - columnChunkPageReadStore.addColumn(chunk.descriptor.col, chunk.readAllPages()); + currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); } } + + // avoid re-reading bytes the dictionary reader is used after this call + if (nextDictionaryReader != null) { + nextDictionaryReader.setRowGroup(currentRowGroup); + } + + advanceToNextBlock(); + + return currentRowGroup; + } + + public boolean skipNextRowGroup() { + return advanceToNextBlock(); + } + + private boolean advanceToNextBlock() { + if (currentBlock == blocks.size()) { + return false; + } + + // update the current block and instantiate a dictionary reader for it ++currentBlock; - return columnChunkPageReadStore; + this.nextDictionaryReader = null; + + return true; } + /** + * Returns a {@link DictionaryPageReadStore} for the row group that would be + * returned by calling {@link #readNextRowGroup()} or skipped by calling + * {@link #skipNextRowGroup()}. + * + * @return a DictionaryPageReadStore for the next row group + */ + public DictionaryPageReadStore getNextDictionaryReader() { + if (nextDictionaryReader == null && currentBlock < blocks.size()) { + this.nextDictionaryReader = getDictionaryReader(blocks.get(currentBlock)); + } + return nextDictionaryReader; + } + + public DictionaryPageReader getDictionaryReader(BlockMetaData block) { + return new DictionaryPageReader(this, block); + } + + /** + * Reads and decompresses a dictionary page for the given column chunk. + * + * Returns null if the given column chunk has no dictionary page. + * + * @param meta a column's ColumnChunkMetaData to read the dictionary from + * @return an uncompressed DictionaryPage or null + * @throws IOException + */ + DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException { + if (!meta.getEncodings().contains(Encoding.PLAIN_DICTIONARY) && + !meta.getEncodings().contains(Encoding.RLE_DICTIONARY)) { + return null; + } + + // TODO: this should use getDictionaryPageOffset() but it isn't reliable. + if (f.getPos() != meta.getStartingPos()) { + f.seek(meta.getStartingPos()); + } + + PageHeader pageHeader = Util.readPageHeader(f); + if (!pageHeader.isSetDictionary_page_header()) { + return null; // TODO: should this complain? + } + + DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f); + BytesDecompressor decompressor = codecFactory.getDecompressor(meta.getCodec()); + + return new DictionaryPage( + decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()), + compressedPage.getDictionarySize(), + compressedPage.getEncoding()); + } + + private static DictionaryPage readCompressedDictionary( + PageHeader pageHeader, FSDataInputStream fin) throws IOException { + DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header(); + int uncompressedPageSize = pageHeader.getUncompressed_page_size(); + int compressedPageSize = pageHeader.getCompressed_page_size(); + + byte [] dictPageBytes = new byte[compressedPageSize]; + fin.readFully(dictPageBytes); + + BytesInput bin = BytesInput.from(dictPageBytes); + + return new DictionaryPage( + bin, uncompressedPageSize, dictHeader.getNum_values(), + converter.getEncoding(dictHeader.getEncoding())); + } @Override public void close() throws IOException { - f.close(); - this.codecFactory.release(); + if (f != null) { + f.close(); + } + if (codecFactory != null) { + codecFactory.release(); + } } /** @@ -622,7 +844,7 @@ public class ParquetFileReader implements Closeable { dataHeaderV1.getNum_values(), uncompressedPageSize, fromParquetStatistics( - createdBy, + getFileMetaData().getCreatedBy(), dataHeaderV1.getStatistics(), descriptor.col.getType()), converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()), @@ -645,7 +867,7 @@ public class ParquetFileReader implements Closeable { this.readAsBytesInput(dataSize), uncompressedPageSize, fromParquetStatistics( - createdBy, + getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), descriptor.col.getType()), dataHeaderV2.isIs_compressed() @@ -664,7 +886,7 @@ public class ParquetFileReader implements Closeable { // Would be nice to have a CorruptParquetFileException or something as a subclass? throw new IOException( "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " + - filePath + " offset " + descriptor.metadata.getFirstDataPageOffset() + + getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() + " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size() + " pages ending at file offset " + (descriptor.fileOffset + pos())); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index 7cbb04a..ff9c811 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -35,11 +35,8 @@ import org.apache.parquet.Preconditions; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; -import org.apache.parquet.filter2.compat.RowGroupFilter; import org.apache.parquet.hadoop.api.ReadSupport; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.util.HiddenFileFilter; -import org.apache.parquet.schema.MessageType; /** * Read records from a Parquet file. @@ -141,17 +138,15 @@ public class ParquetReader implements Closeable { if (footersIterator.hasNext()) { Footer footer = footersIterator.next(); - List blocks = footer.getParquetMetadata().getBlocks(); + ParquetFileReader fileReader = ParquetFileReader.open( + conf, footer.getFile(), footer.getParquetMetadata()); - MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema(); - - List filteredBlocks = RowGroupFilter.filterRowGroups( - filter, blocks, fileSchema); + // apply data filters + fileReader.filterRowGroups(filter); reader = new InternalParquetRecordReader(readSupport, filter); - reader.initialize(fileSchema, - footer.getParquetMetadata().getFileMetaData(), - footer.getFile(), filteredBlocks, conf); + + reader.initialize(fileReader, conf); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java index 1558fc0..eae3b4e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java @@ -18,10 +18,9 @@ */ package org.apache.parquet.hadoop; -import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.*; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.offsets; import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; -import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; @@ -47,15 +46,15 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.filter.UnboundRecordFilter; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel; +import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.FileMetaData; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.io.ParquetDecodingException; -import org.apache.parquet.schema.MessageType; /** * Reads the records from a block of a Parquet file @@ -154,52 +153,38 @@ public class ParquetRecordReader extends RecordReader { private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException { Path path = split.getPath(); long[] rowGroupOffsets = split.getRowGroupOffsets(); - List filteredBlocks; - ParquetMetadata footer; + // if task.side.metadata is set, rowGroupOffsets is null - if (rowGroupOffsets == null) { - // then we need to apply the predicate push down filter - footer = readFooter(configuration, path, range(split.getStart(), split.getEnd())); - MessageType fileSchema = footer.getFileMetaData().getSchema(); - Filter filter = getFilter(configuration); - filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); - } else { - // otherwise we find the row groups that were selected on the client - footer = readFooter(configuration, path, NO_FILTER); - Set offsets = new HashSet(); - for (long offset : rowGroupOffsets) { - offsets.add(offset); - } - filteredBlocks = new ArrayList(); - for (BlockMetaData block : footer.getBlocks()) { - if (offsets.contains(block.getStartingPos())) { - filteredBlocks.add(block); - } - } - // verify we found them all - if (filteredBlocks.size() != rowGroupOffsets.length) { - long[] foundRowGroupOffsets = new long[footer.getBlocks().size()]; - for (int i = 0; i < foundRowGroupOffsets.length; i++) { - foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos(); - } - // this should never happen. - // provide a good error message in case there's a bug + MetadataFilter metadataFilter = (rowGroupOffsets != null ? + offsets(rowGroupOffsets) : + range(split.getStart(), split.getEnd())); + + // open a reader with the metadata filter + ParquetFileReader reader = ParquetFileReader.open( + configuration, path, metadataFilter); + + if (rowGroupOffsets != null) { + // verify a row group was found for each offset + List blocks = reader.getFooter().getBlocks(); + if (blocks.size() != rowGroupOffsets.length) { throw new IllegalStateException( - "All the offsets listed in the split should be found in the file." + "All of the offsets in the split should be found in the file." + " expected: " + Arrays.toString(rowGroupOffsets) - + " found: " + filteredBlocks - + " out of: " + Arrays.toString(foundRowGroupOffsets) - + " in range " + split.getStart() + ", " + split.getEnd()); + + " found: " + blocks); } + + } else { + // apply data filters + reader.filterRowGroups(getFilter(configuration)); } - if (!filteredBlocks.isEmpty()) { - checkDeltaByteArrayProblem(footer.getFileMetaData(), configuration, filteredBlocks.get(0)); + if (!reader.getRowGroups().isEmpty()) { + checkDeltaByteArrayProblem( + reader.getFooter().getFileMetaData(), configuration, + reader.getRowGroups().get(0)); } - MessageType fileSchema = footer.getFileMetaData().getSchema(); - internalReader.initialize( - fileSchema, footer.getFileMetaData(), path, filteredBlocks, configuration); + internalReader.initialize(reader, configuration); } private void checkDeltaByteArrayProblem(FileMetaData meta, Configuration conf, BlockMetaData block) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java new file mode 100644 index 0000000..754da68 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.filter2.dictionarylevel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DictionaryPageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.Operators.BinaryColumn; +import org.apache.parquet.filter2.predicate.Operators.DoubleColumn; +import org.apache.parquet.filter2.predicate.Operators.FloatColumn; +import org.apache.parquet.filter2.predicate.Operators.IntColumn; +import org.apache.parquet.filter2.predicate.Operators.LongColumn; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop; +import static org.apache.parquet.filter2.predicate.FilterApi.*; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class DictionaryFilterTest { + + private static final int nElements = 1000; + private static final Configuration conf = new Configuration(); + private static Path file = new Path("target/test/TestDictionaryFilter/testParquetFile"); + private static final MessageType schema = parseMessageType( + "message test { " + + "required binary binary_field; " + + "required binary single_value_field; " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required double double_field; " + + "required float float_field; " + + "required int32 plain_int32_field; " + + "required binary fallback_binary_field; " + + "} "); + + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; + private static final int[] intValues = new int[] { + -100, 302, 3333333, 7654321, 1234567, -2000, -77775, 0, 75, 22223, + 77, 22221, -444443, 205, 12, 44444, 889, 66665, -777889, -7, + 52, 33, -257, 1111, 775, 26}; + private static final long[] longValues = new long[] { + -100L, 302L, 3333333L, 7654321L, 1234567L, -2000L, -77775L, 0L, + 75L, 22223L, 77L, 22221L, -444443L, 205L, 12L, 44444L, 889L, 66665L, + -777889L, -7L, 52L, 33L, -257L, 1111L, 775L, 26L}; + + private static void writeData(SimpleGroupFactory f, ParquetWriter writer) throws IOException { + for (int i = 0; i < nElements; i++) { + int index = i % ALPHABET.length(); + + Group group = f.newGroup() + .append("binary_field", ALPHABET.substring(index, index+1)) + .append("single_value_field", "sharp") + .append("int32_field", intValues[i % intValues.length]) + .append("int64_field", longValues[i % longValues.length]) + .append("double_field", toDouble(intValues[i % intValues.length])) + .append("float_field", toFloat(intValues[i % intValues.length])) + .append("plain_int32_field", i) + .append("fallback_binary_field", i < (nElements / 2) ? + ALPHABET.substring(index, index+1) : UUID.randomUUID().toString()); + + writer.write(group); + } + writer.close(); + } + + @BeforeClass + public static void prepareFile() throws IOException { + cleanup(); + + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter writer = ExampleParquetWriter.builder(file) + .withWriterVersion(PARQUET_1_0) + .withCompressionCodec(UNCOMPRESSED) + .withRowGroupSize(1024*1024) + .withPageSize(1024) + .enableDictionaryEncoding() + .withDictionaryPageSize(2*1024) + .withConf(conf) + .build(); + writeData(f, writer); + } + + @AfterClass + public static void cleanup() throws IOException { + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + fs.delete(file, true); + } + } + + + List ccmd; + ParquetFileReader reader; + DictionaryPageReadStore dictionaries; + + @Before + public void setUp() throws Exception { + reader = ParquetFileReader.open(conf, file); + ParquetMetadata meta = reader.getFooter(); + ccmd = meta.getBlocks().get(0).getColumns(); + dictionaries = reader.getDictionaryReader(meta.getBlocks().get(0)); + } + + @After + public void tearDown() throws Exception { + reader.close(); + } + + @Test + @SuppressWarnings("deprecation") + public void testDictionaryEncodedColumns() throws Exception { + Set dictionaryEncodedColumns = new HashSet(Arrays.asList( + "binary_field", "single_value_field", "int32_field", "int64_field", + "double_field", "float_field")); + for (ColumnChunkMetaData column : ccmd) { + String name = column.getPath().toDotString(); + if (dictionaryEncodedColumns.contains(name)) { + assertTrue("Column should be dictionary encoded: " + name, + column.getEncodings().contains(Encoding.PLAIN_DICTIONARY)); + assertFalse("Column should not have plain data pages" + name, + column.getEncodings().contains(Encoding.PLAIN)); + + } else { + assertTrue("Column should have plain encoding: " + name, + column.getEncodings().contains(Encoding.PLAIN)); + + if (name.startsWith("fallback")) { + assertTrue("Column should be have some dictionary encoding: " + name, + column.getEncodings().contains(Encoding.PLAIN_DICTIONARY)); + } else { + assertFalse("Column should have no dictionary encoding: " + name, + column.getEncodings().contains(Encoding.PLAIN_DICTIONARY)); + } + } + } + } + + @Test + public void testEqBinary() throws Exception { + BinaryColumn b = binaryColumn("binary_field"); + FilterPredicate pred = eq(b, Binary.fromString("c")); + + assertFalse("Should not drop block for lower case letters", + canDrop(pred, ccmd, dictionaries)); + + assertTrue("Should drop block for upper case letters", + canDrop(eq(b, Binary.fromString("A")), ccmd, dictionaries)); + } + + @Test + public void testNotEqBinary() throws Exception { + BinaryColumn sharp = binaryColumn("single_value_field"); + BinaryColumn b = binaryColumn("binary_field"); + + assertTrue("Should drop block with only the excluded value", + canDrop(notEq(sharp, Binary.fromString("sharp")), ccmd, dictionaries)); + + assertFalse("Should not drop block with any other value", + canDrop(notEq(sharp, Binary.fromString("applause")), ccmd, dictionaries)); + + assertFalse("Should not drop block with a known value", + canDrop(notEq(b, Binary.fromString("x")), ccmd, dictionaries)); + + assertFalse("Should not drop block with a known value", + canDrop(notEq(b, Binary.fromString("B")), ccmd, dictionaries)); + } + + @Test + public void testLtInt() throws Exception { + IntColumn i32 = intColumn("int32_field"); + int lowest = Integer.MAX_VALUE; + for (int value : intValues) { + lowest = Math.min(lowest, value); + } + + assertTrue("Should drop: < lowest value", + canDrop(lt(i32, lowest), ccmd, dictionaries)); + assertFalse("Should not drop: < (lowest value + 1)", + canDrop(lt(i32, lowest + 1), ccmd, dictionaries)); + + assertFalse("Should not drop: contains matching values", + canDrop(lt(i32, Integer.MAX_VALUE), ccmd, dictionaries)); + } + + @Test + public void testLtEqLong() throws Exception { + LongColumn i64 = longColumn("int64_field"); + long lowest = Long.MAX_VALUE; + for (long value : longValues) { + lowest = Math.min(lowest, value); + } + + assertTrue("Should drop: <= lowest - 1", + canDrop(ltEq(i64, lowest - 1), ccmd, dictionaries)); + assertFalse("Should not drop: <= lowest", + canDrop(ltEq(i64, lowest), ccmd, dictionaries)); + + assertFalse("Should not drop: contains matching values", + canDrop(ltEq(i64, Long.MAX_VALUE), ccmd, dictionaries)); + } + + @Test + public void testGtFloat() throws Exception { + FloatColumn f = floatColumn("float_field"); + float highest = Float.MIN_VALUE; + for (int value : intValues) { + highest = Math.max(highest, toFloat(value)); + } + + assertTrue("Should drop: > highest value", + canDrop(gt(f, highest), ccmd, dictionaries)); + assertFalse("Should not drop: > (highest value - 1.0)", + canDrop(gt(f, highest - 1.0f), ccmd, dictionaries)); + + assertFalse("Should not drop: contains matching values", + canDrop(gt(f, Float.MIN_VALUE), ccmd, dictionaries)); + } + + @Test + public void testGtEqDouble() throws Exception { + DoubleColumn d = doubleColumn("double_field"); + double highest = Double.MIN_VALUE; + for (int value : intValues) { + highest = Math.max(highest, toDouble(value)); + } + + assertTrue("Should drop: >= highest + 0.00000001", + canDrop(gtEq(d, highest + 0.00000001), ccmd, dictionaries)); + assertFalse("Should not drop: >= highest", + canDrop(gtEq(d, highest), ccmd, dictionaries)); + + assertFalse("Should not drop: contains matching values", + canDrop(gtEq(d, Double.MIN_VALUE), ccmd, dictionaries)); + } + + @Test + public void testAnd() throws Exception { + BinaryColumn col = binaryColumn("binary_field"); + + // both evaluate to false (no upper-case letters are in the dictionary) + FilterPredicate B = eq(col, Binary.fromString("B")); + FilterPredicate C = eq(col, Binary.fromString("C")); + + // both evaluate to true (all lower-case letters are in the dictionary) + FilterPredicate x = eq(col, Binary.fromString("x")); + FilterPredicate y = eq(col, Binary.fromString("y")); + + assertTrue("Should drop when either predicate must be false", + canDrop(and(B, y), ccmd, dictionaries)); + assertTrue("Should drop when either predicate must be false", + canDrop(and(x, C), ccmd, dictionaries)); + assertTrue("Should drop when either predicate must be false", + canDrop(and(B, C), ccmd, dictionaries)); + assertFalse("Should not drop when either predicate could be true", + canDrop(and(x, y), ccmd, dictionaries)); + } + + @Test + public void testOr() throws Exception { + BinaryColumn col = binaryColumn("binary_field"); + + // both evaluate to false (no upper-case letters are in the dictionary) + FilterPredicate B = eq(col, Binary.fromString("B")); + FilterPredicate C = eq(col, Binary.fromString("C")); + + // both evaluate to true (all lower-case letters are in the dictionary) + FilterPredicate x = eq(col, Binary.fromString("x")); + FilterPredicate y = eq(col, Binary.fromString("y")); + + assertFalse("Should not drop when one predicate could be true", + canDrop(or(B, y), ccmd, dictionaries)); + assertFalse("Should not drop when one predicate could be true", + canDrop(or(x, C), ccmd, dictionaries)); + assertTrue("Should drop when both predicates must be false", + canDrop(or(B, C), ccmd, dictionaries)); + assertFalse("Should not drop when one predicate could be true", + canDrop(or(x, y), ccmd, dictionaries)); + } + + @Test + public void testColumnWithoutDictionary() throws Exception { + IntColumn plain = intColumn("plain_int32_field"); + DictionaryPageReadStore dictionaryStore = mock(DictionaryPageReadStore.class); + + assertFalse("Should never drop block using plain encoding", + canDrop(eq(plain, -10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(lt(plain, -10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(ltEq(plain, -10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(gt(plain, nElements + 10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(gtEq(plain, nElements + 10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(notEq(plain, nElements + 10), ccmd, dictionaryStore)); + + verifyZeroInteractions(dictionaryStore); + } + + @Test + public void testColumnWithDictionaryAndPlainEncodings() throws Exception { + IntColumn plain = intColumn("fallback_binary_field"); + DictionaryPageReadStore dictionaryStore = mock(DictionaryPageReadStore.class); + + assertFalse("Should never drop block using plain encoding", + canDrop(eq(plain, -10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(lt(plain, -10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(ltEq(plain, -10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(gt(plain, nElements + 10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(gtEq(plain, nElements + 10), ccmd, dictionaryStore)); + + assertFalse("Should never drop block using plain encoding", + canDrop(notEq(plain, nElements + 10), ccmd, dictionaryStore)); + + verifyZeroInteractions(dictionaryStore); + } + + private static double toDouble(int value) { + return (value * 1.0); + } + + private static float toFloat(int value) { + return (float) (value * 2.0); + } +}