parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-384: Add dictionary filtering.
Date Wed, 09 Mar 2016 21:20:47 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 1f91c79de -> 4b1ff8f4b


PARQUET-384: Add dictionary filtering.

This builds on #286 from @danielcweeks and cleans up some of the interfaces. It introduces `DictionaryPageReadStore` to expose dictionary pages to the filters and cleans up some internal calls by passing `ParquetFileReader`.

When committed, this closes #286.

Author: Ryan Blue <blue@apache.org>
Author: Daniel Weeks <dweeks@netflix.com>

Closes #330 from rdblue/PARQUET-384-add-dictionary-filtering and squashes the following commits:

ff89424 [Ryan Blue] PARQUET-384: Add a cache to DictionaryPageReader.
1f6861c [Ryan Blue] PARQUET-384: Use ParquetFileReader to initialize readers.
21ef4b6 [Daniel Weeks] PARQUET-384: Add dictionary row group filter.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/4b1ff8f4
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/4b1ff8f4
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/4b1ff8f4

Branch: refs/heads/master
Commit: 4b1ff8f4b9dfa0ccb064ef286cf2953bfb2c492d
Parents: 1f91c79
Author: Ryan Blue <blue@apache.org>
Authored: Wed Mar 9 13:20:37 2016 -0800
Committer: Ryan Blue <blue@apache.org>
Committed: Wed Mar 9 13:20:37 2016 -0800

----------------------------------------------------------------------
 .../column/page/DictionaryPageReadStore.java    |  36 ++
 .../parquet/filter2/compat/RowGroupFilter.java  |  37 +-
 .../dictionarylevel/DictionaryFilter.java       | 356 +++++++++++++++++
 .../converter/ParquetMetadataConverter.java     |  47 ++-
 .../hadoop/ColumnChunkPageReadStore.java        |  10 +-
 .../parquet/hadoop/DictionaryPageReader.java    | 110 ++++++
 .../hadoop/InternalParquetRecordReader.java     |  39 +-
 .../parquet/hadoop/ParquetFileReader.java       | 320 ++++++++++++---
 .../apache/parquet/hadoop/ParquetReader.java    |  17 +-
 .../parquet/hadoop/ParquetRecordReader.java     |  71 ++--
 .../dictionarylevel/DictionaryFilterTest.java   | 387 +++++++++++++++++++
 11 files changed, 1286 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java
new file mode 100644
index 0000000..a0cd5f5
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DictionaryPageReadStore.java
@@ -0,0 +1,36 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.page;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * Interface to read dictionary pages for all the columns of a row group
+ */
+public interface DictionaryPageReadStore {
+
+  /**
+   * Returns a {@link DictionaryPage} for the given column descriptor.
+   * The dictionary page bytes are uncompressed.
+   *
+   * @param descriptor the descriptor of the column
+   * @return the DictionaryPage for that column, or null if there isn't one
+   */
+  DictionaryPage readDictionaryPage(ColumnDescriptor descriptor);
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
index d85a231..fd74799 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -19,14 +19,17 @@
 package org.apache.parquet.filter2.compat;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
 import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
+import org.apache.parquet.filter2.dictionarylevel.DictionaryFilter;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator;
 import org.apache.parquet.filter2.statisticslevel.StatisticsFilter;
+import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.schema.MessageType;
 
@@ -40,15 +43,37 @@ import static org.apache.parquet.Preconditions.checkNotNull;
 public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
   private final List<BlockMetaData> blocks;
   private final MessageType schema;
+  private final List<FilterLevel> levels;
+  private final ParquetFileReader reader;
+
+  public enum FilterLevel {
+    STATISTICS,
+    DICTIONARY
+  }
 
   public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
     checkNotNull(filter, "filter");
     return filter.accept(new RowGroupFilter(blocks, schema));
   }
 
+  public static List<BlockMetaData> filterRowGroups(List<FilterLevel> levels, Filter filter, List<BlockMetaData> blocks, ParquetFileReader reader) {
+    checkNotNull(filter, "filter");
+    return filter.accept(new RowGroupFilter(levels, blocks, reader));
+  }
+
+  @Deprecated
   private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
     this.blocks = checkNotNull(blocks, "blocks");
     this.schema = checkNotNull(schema, "schema");
+    this.levels = Collections.singletonList(FilterLevel.STATISTICS);
+    this.reader = null;
+  }
+
+  private RowGroupFilter(List<FilterLevel> levels, List<BlockMetaData> blocks, ParquetFileReader reader) {
+    this.blocks = checkNotNull(blocks, "blocks");
+    this.reader = checkNotNull(reader, "reader");
+    this.schema = reader.getFileMetaData().getSchema();
+    this.levels = levels;
   }
 
   @Override
@@ -61,7 +86,17 @@ public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
     List<BlockMetaData> filteredBlocks = new ArrayList<BlockMetaData>();
 
     for (BlockMetaData block : blocks) {
-      if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
+      boolean drop = false;
+
+      if(levels.contains(FilterLevel.STATISTICS)) {
+        drop = StatisticsFilter.canDrop(filterPredicate, block.getColumns());
+      }
+
+      if(!drop && levels.contains(FilterLevel.DICTIONARY)) {
+        drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block));
+      }
+
+      if(!drop) {
         filteredBlocks.add(block);
       }
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
new file mode 100644
index 0000000..6235c20
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.filter2.dictionarylevel;
+
+import org.apache.parquet.Log;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.*;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+
+/**
+ * Applies filters based on the contents of column dictionaries.
+ */
+public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
+
+  private static final Log LOG = Log.getLog(DictionaryFilter.class);
+  private static final boolean BLOCK_MIGHT_MATCH = false;
+  private static final boolean BLOCK_CANNOT_MATCH = true;
+
+  public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, DictionaryPageReadStore dictionaries) {
+    checkNotNull(pred, "pred");
+    checkNotNull(columns, "columns");
+    return pred.accept(new DictionaryFilter(columns, dictionaries));
+  }
+
+  private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+  private final DictionaryPageReadStore dictionaries;
+
+  private DictionaryFilter(List<ColumnChunkMetaData> columnsList, DictionaryPageReadStore dictionaries) {
+    for (ColumnChunkMetaData chunk : columnsList) {
+      columns.put(chunk.getPath(), chunk);
+    }
+
+    this.dictionaries = dictionaries;
+  }
+
+  private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+    ColumnChunkMetaData c = columns.get(columnPath);
+    checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
+    return c;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T extends Comparable<T>> Set<T> expandDictionary(ColumnChunkMetaData meta) throws IOException {
+    ColumnDescriptor col = new ColumnDescriptor(meta.getPath().toArray(), meta.getType(), -1, -1);
+    DictionaryPage page = dictionaries.readDictionaryPage(col);
+
+    // the chunk may not be dictionary-encoded
+    if (page == null) {
+      return null;
+    }
+
+    Dictionary dict = page.getEncoding().initDictionary(col, page);
+
+    Set dictSet = new HashSet<T>();
+
+    for (int i=0; i<=dict.getMaxId(); i++) {
+      switch(meta.getType()) {
+        case BINARY: dictSet.add(dict.decodeToBinary(i));
+          break;
+        case INT32: dictSet.add(dict.decodeToInt(i));
+          break;
+        case INT64: dictSet.add(dict.decodeToLong(i));
+          break;
+        case FLOAT: dictSet.add(dict.decodeToFloat(i));
+          break;
+        case DOUBLE: dictSet.add(dict.decodeToDouble(i));
+          break;
+        default:
+          LOG.warn("Unknown dictionary type" + meta.getType());
+      }
+    }
+
+    return (Set<T>) dictSet;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
+    Column<T> filterColumn = eq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    T value = eq.getValue();
+
+    filterColumn.getColumnPath();
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet != null && !dictSet.contains(value)) {
+        return BLOCK_CANNOT_MATCH;
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+
+    return BLOCK_MIGHT_MATCH; // cannot drop the row group based on this dictionary
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
+    Column<T> filterColumn = notEq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    T value = notEq.getValue();
+
+    filterColumn.getColumnPath();
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet != null && dictSet.size() == 1 && dictSet.contains(value)) {
+        return BLOCK_CANNOT_MATCH;
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
+    Column<T> filterColumn = lt.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    T value = lt.getValue();
+
+    filterColumn.getColumnPath();
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet == null) {
+        return BLOCK_MIGHT_MATCH;
+      }
+
+      for(T entry : dictSet) {
+        if(value.compareTo(entry) > 0) {
+          return BLOCK_MIGHT_MATCH;
+        }
+      }
+
+      return BLOCK_CANNOT_MATCH;
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
+    Column<T> filterColumn = ltEq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    T value = ltEq.getValue();
+
+    filterColumn.getColumnPath();
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet == null) {
+        return BLOCK_MIGHT_MATCH;
+      }
+
+      for(T entry : dictSet) {
+        if(value.compareTo(entry) >= 0) {
+          return BLOCK_MIGHT_MATCH;
+        }
+      }
+
+      return BLOCK_CANNOT_MATCH;
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
+    Column<T> filterColumn = gt.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    T value = gt.getValue();
+
+    filterColumn.getColumnPath();
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet == null) {
+        return BLOCK_MIGHT_MATCH;
+      }
+
+      for(T entry : dictSet) {
+        if(value.compareTo(entry) < 0) {
+          return BLOCK_MIGHT_MATCH;
+        }
+      }
+
+      return BLOCK_CANNOT_MATCH;
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
+    Column<T> filterColumn = gtEq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    T value = gtEq.getValue();
+
+    filterColumn.getColumnPath();
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet == null) {
+        return BLOCK_MIGHT_MATCH;
+      }
+
+      for(T entry : dictSet) {
+        if(value.compareTo(entry) <= 0) {
+          return BLOCK_MIGHT_MATCH;
+        }
+      }
+
+      return BLOCK_CANNOT_MATCH;
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public Boolean visit(And and) {
+    return and.getLeft().accept(this) || and.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Or or) {
+    return or.getLeft().accept(this) && or.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Not not) {
+    throw new IllegalArgumentException(
+        "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> udp) {
+    throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> udp) {
+    throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+  }
+
+  @SuppressWarnings("deprecation")
+  private static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) {
+    // without EncodingStats, fall back to testing the encoding list
+    Set<Encoding> encodings = new HashSet<Encoding>(meta.getEncodings());
+    if (encodings.remove(Encoding.PLAIN_DICTIONARY)) {
+      // if remove returned true, PLAIN_DICTIONARY was present, which means at
+      // least one page was dictionary encoded and 1.0 encodings are used
+
+      // RLE and BIT_PACKED are only used for repetition or definition levels
+      encodings.remove(Encoding.RLE);
+      encodings.remove(Encoding.BIT_PACKED);
+
+      if (encodings.isEmpty()) {
+        return false; // no encodings other than dictionary or rep/def levels
+      }
+
+      return true;
+
+    } else {
+      // if PLAIN_DICTIONARY wasn't present, then either the column is not
+      // dictionary-encoded, or the 2.0 encoding, RLE_DICTIONARY, was used.
+      // for 2.0, this cannot determine whether a page fell back without
+      // page encoding stats
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index b373bfb..48f295e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -429,13 +429,14 @@ public class ParquetMetadataConverter {
   private static interface MetadataFilterVisitor<T, E extends Throwable> {
     T visit(NoFilter filter) throws E;
     T visit(SkipMetadataFilter filter) throws E;
-    T visit(RangeMetadataFilter filter) throws E;
+    T visit(OffsetMetadataFilter filter) throws E;
   }
 
   public abstract static class MetadataFilter {
     private MetadataFilter() {}
     abstract <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E;
   }
+
   /**
    * [ startOffset, endOffset )
    * @param startOffset
@@ -445,6 +446,15 @@ public class ParquetMetadataConverter {
   public static MetadataFilter range(long startOffset, long endOffset) {
     return new RangeMetadataFilter(startOffset, endOffset);
   }
+
+  public static MetadataFilter offsets(long... offsets) {
+    Set<Long> set = new HashSet<Long>();
+    for (long offset : offsets) {
+      set.add(offset);
+    }
+    return new OffsetListMetadataFilter(set);
+  }
+
   private static final class NoFilter extends MetadataFilter {
     private NoFilter() {}
     @Override
@@ -467,39 +477,66 @@ public class ParquetMetadataConverter {
       return "SKIP_ROW_GROUPS";
     }
   }
+
+  interface OffsetMetadataFilter {
+    boolean contains(long offset);
+  }
+
   /**
    * [ startOffset, endOffset )
    * @author Julien Le Dem
    */
   // Visible for testing
-  static final class RangeMetadataFilter extends MetadataFilter {
+  static final class RangeMetadataFilter extends MetadataFilter implements OffsetMetadataFilter {
     final long startOffset;
     final long endOffset;
+
     RangeMetadataFilter(long startOffset, long endOffset) {
       super();
       this.startOffset = startOffset;
       this.endOffset = endOffset;
     }
+
     @Override
     <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
       return visitor.visit(this);
     }
-    boolean contains(long offset) {
+
+    @Override
+    public boolean contains(long offset) {
       return offset >= this.startOffset && offset < this.endOffset;
     }
+
     @Override
     public String toString() {
       return "range(s:" + startOffset + ", e:" + endOffset + ")";
     }
   }
 
+  static final class OffsetListMetadataFilter extends MetadataFilter implements OffsetMetadataFilter {
+    private final Set<Long> offsets;
+
+    public OffsetListMetadataFilter(Set<Long> offsets) {
+      this.offsets = offsets;
+    }
+
+    public boolean contains(long offset) {
+      return offsets.contains(offset);
+    }
+
+    @Override
+    <T, E extends Throwable> T accept(MetadataFilterVisitor<T, E> visitor) throws E {
+      return visitor.visit(this);
+    }
+  }
+
   @Deprecated
   public ParquetMetadata readParquetMetadata(InputStream from) throws IOException {
     return readParquetMetadata(from, NO_FILTER);
   }
 
   // Visible for testing
-  static FileMetaData filterFileMetaData(FileMetaData metaData, RangeMetadataFilter filter) {
+  static FileMetaData filterFileMetaData(FileMetaData metaData, OffsetMetadataFilter filter) {
     List<RowGroup> rowGroups = metaData.getRow_groups();
     List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
     for (RowGroup rowGroup : rowGroups) {
@@ -544,7 +581,7 @@ public class ParquetMetadataConverter {
       }
 
       @Override
-      public FileMetaData visit(RangeMetadataFilter filter) throws IOException {
+      public FileMetaData visit(OffsetMetadataFilter filter) throws IOException {
         return filterFileMetaData(readFileMetaData(from), filter);
       }
     });

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index af06747..b0d0d30 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -31,6 +31,7 @@ import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
 import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.hadoop.CodecFactory.BytesDecompressor;
@@ -42,7 +43,7 @@ import org.apache.parquet.io.ParquetDecodingException;
  * in our format: columns, chunks, and pages
  *
  */
-class ColumnChunkPageReadStore implements PageReadStore {
+class ColumnChunkPageReadStore implements PageReadStore, DictionaryPageReadStore {
   private static final Log LOG = Log.getLog(ColumnChunkPageReadStore.class);
 
   /**
@@ -136,7 +137,7 @@ class ColumnChunkPageReadStore implements PageReadStore {
             compressedDictionaryPage.getDictionarySize(),
             compressedDictionaryPage.getEncoding());
       } catch (IOException e) {
-        throw new RuntimeException(e); // TODO: cleanup
+        throw new ParquetDecodingException("Could not decompress dictionary page", e);
       }
     }
   }
@@ -161,6 +162,11 @@ class ColumnChunkPageReadStore implements PageReadStore {
     return readers.get(path);
   }
 
+  @Override
+  public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) {
+    return readers.get(descriptor).readDictionaryPage();
+  }
+
   void addColumn(ColumnDescriptor path, ColumnChunkPageReader reader) {
     if (readers.put(path, reader) != null) {
       throw new RuntimeException(path+ " was added twice");

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
new file mode 100644
index 0000000..cb0d5e7
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -0,0 +1,110 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import org.apache.parquet.Strings;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.ParquetDecodingException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+
+/**
+ * A {@link DictionaryPageReadStore} implementation that reads dictionaries from
+ * an open {@link ParquetFileReader}.
+ *
+ * This implementation will delegate dictionary reads to a
+ * {@link ColumnChunkPageReadStore} to avoid extra reads after a row group has
+ * been loaded into memory.
+ */
+class DictionaryPageReader implements DictionaryPageReadStore {
+
+  private final ParquetFileReader reader;
+  private final Map<String, ColumnChunkMetaData> columns;
+  private final Map<String, DictionaryPage> cache = new HashMap<String, DictionaryPage>();
+  private ColumnChunkPageReadStore rowGroup = null;
+
+  DictionaryPageReader(ParquetFileReader reader, BlockMetaData block) {
+    this.reader = reader;
+    this.columns = new HashMap<String, ColumnChunkMetaData>();
+    for (ColumnChunkMetaData column : block.getColumns()) {
+      columns.put(column.getPath().toDotString(), column);
+    }
+  }
+
+  /**
+   * Sets this reader's row group's page store. When a row group is set, this
+   * reader will delegate to that row group to return dictionary pages. This
+   * avoids seeking and re-reading dictionary bytes after this reader's row
+   * group is loaded into memory.
+   *
+   * @param rowGroup a ColumnChunkPageReadStore for this reader's row group
+   */
+  void setRowGroup(ColumnChunkPageReadStore rowGroup) {
+    this.rowGroup = rowGroup;
+  }
+
+  @Override
+  public DictionaryPage readDictionaryPage(ColumnDescriptor descriptor) {
+    if (rowGroup != null) {
+      // if the row group has already been read, use that dictionary
+      return rowGroup.readDictionaryPage(descriptor);
+    }
+
+    String dotPath = Strings.join(descriptor.getPath(), ".");
+    ColumnChunkMetaData column = columns.get(dotPath);
+    if (column == null) {
+      throw new ParquetDecodingException(
+          "Cannot load dictionary, unknown column: " + dotPath);
+    }
+
+    if (cache.containsKey(dotPath)) {
+      return cache.get(dotPath);
+    }
+
+    try {
+      synchronized (cache) {
+        // check the cache again in case this thread waited on another reading the same page
+        if (!cache.containsKey(dotPath)) {
+          DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null;
+          cache.put(dotPath, dict);
+        }
+      }
+
+      return cache.get(dotPath);
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+          "Failed to read dictionary", e);
+    }
+  }
+
+  private boolean hasDictionaryPage(ColumnChunkMetaData column) {
+    Set<Encoding> encodings = column.getEncodings();
+    return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index c1bd037..f74e57c 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -22,22 +22,18 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 
 import org.apache.parquet.Log;
-import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.hadoop.api.InitContext;
 import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ColumnIOFactory;
@@ -45,9 +41,7 @@ import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.RecordMaterializer;
 import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
-import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
 
 import static java.lang.String.format;
 import static org.apache.parquet.Log.DEBUG;
@@ -81,7 +75,6 @@ class InternalParquetRecordReader<T> {
 
   private long totalCountLoadedSoFar = 0;
 
-  private Path file;
   private UnmaterializableRecordCounter unmaterializableRecordCounter;
 
   /**
@@ -163,46 +156,26 @@ class InternalParquetRecordReader<T> {
     return (float) current / total;
   }
 
-  public void initialize(MessageType fileSchema,
-      FileMetaData parquetFileMetadata,
-      Path file, List<BlockMetaData> blocks, Configuration configuration)
+  public void initialize(ParquetFileReader reader, Configuration configuration)
       throws IOException {
     // initialize a ReadContext for this file
+    this.reader = reader;
+    FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData();
+    this.fileSchema = parquetFileMetadata.getSchema();
     Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData();
     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
         configuration, toSetMultiMap(fileMetadata), fileSchema));
     this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
     this.requestedSchema = readContext.getRequestedSchema();
-    this.fileSchema = fileSchema;
-    this.file = file;
     this.columnCount = requestedSchema.getPaths().size();
     this.recordConverter = readSupport.prepareForRead(
         configuration, fileMetadata, fileSchema, readContext);
     this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
-    List<ColumnDescriptor> columns = requestedSchema.getColumns();
-    reader = new ParquetFileReader(configuration, parquetFileMetadata, file, blocks, columns);
-    for (BlockMetaData block : blocks) {
-      total += block.getRowCount();
-    }
+    this.total = reader.getRecordCount();
     this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
     LOG.info("RecordReader initialized will read a total of " + total + " records.");
   }
 
-  private boolean contains(GroupType group, String[] path, int index) {
-    if (index == path.length) {
-      return false;
-    }
-    if (group.containsField(path[index])) {
-      Type type = group.getType(path[index]);
-      if (type.isPrimitive()) {
-        return index + 1 == path.length;
-      } else {
-        return contains(type.asGroupType(), path, index + 1);
-      }
-    }
-    return false;
-  }
-
   public boolean nextKeyValue() throws IOException, InterruptedException {
     boolean recordFound = false;
 
@@ -240,7 +213,7 @@ class InternalParquetRecordReader<T> {
 
         if (DEBUG) LOG.debug("read value: " + currentValue);
       } catch (RuntimeException e) {
-        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
+        throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getPath()), e);
       }
     }
     return true;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 55ed5ee..3d7b499 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -20,6 +20,8 @@ package org.apache.parquet.hadoop;
 
 import static org.apache.parquet.Log.DEBUG;
 import static org.apache.parquet.bytes.BytesUtils.readIntLittleEndian;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.DICTIONARY;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.STATISTICS;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
@@ -56,6 +58,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.compat.RowGroupFilter;
 import org.apache.parquet.hadoop.util.CompatibilityUtil;
 
 import org.apache.parquet.Log;
@@ -422,58 +428,74 @@ public class ParquetFileReader implements Closeable {
    */
   public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
     FileSystem fileSystem = file.getPath().getFileSystem(configuration);
-    FSDataInputStream f = fileSystem.open(file.getPath());
+    FSDataInputStream in = fileSystem.open(file.getPath());
     try {
-      long l = file.getLen();
-      if (Log.DEBUG) {
-        LOG.debug("File length " + l);
-      }
-      int FOOTER_LENGTH_SIZE = 4;
-      if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
-        throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)");
-      }
-      long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length;
-      if (Log.DEBUG) {
-        LOG.debug("reading footer index at " + footerLengthIndex);
-      }
-
-      f.seek(footerLengthIndex);
-      int footerLength = readIntLittleEndian(f);
-      byte[] magic = new byte[MAGIC.length];
-      f.readFully(magic);
-      if (!Arrays.equals(MAGIC, magic)) {
-        throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
-      }
-      long footerIndex = footerLengthIndex - footerLength;
-      if (Log.DEBUG) {
-        LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
-      }
-      if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
-        throw new RuntimeException("corrupted file: the footer index is not within the file");
-      }
-      f.seek(footerIndex);
-      return converter.readParquetMetadata(f, filter);
+      return readFooter(file, in, filter);
     } finally {
-      f.close();
+      in.close();
     }
   }
 
-  static ParquetFileReader open(Configuration conf, Path file) throws IOException {
-    ParquetMetadata footer = readFooter(conf, file, NO_FILTER);
-    return new ParquetFileReader(conf, footer.getFileMetaData(), file,
-        footer.getBlocks(), footer.getFileMetaData().getSchema().getColumns());
+  private static final ParquetMetadata readFooter(FileStatus file, FSDataInputStream f, MetadataFilter filter) throws IOException {
+    long l = file.getLen();
+    if (Log.DEBUG) {
+      LOG.debug("File length " + l);
+    }
+    int FOOTER_LENGTH_SIZE = 4;
+    if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
+      throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)");
+    }
+    long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length;
+    if (Log.DEBUG) {
+      LOG.debug("reading footer index at " + footerLengthIndex);
+    }
+
+    f.seek(footerLengthIndex);
+    int footerLength = readIntLittleEndian(f);
+    byte[] magic = new byte[MAGIC.length];
+    f.readFully(magic);
+    if (!Arrays.equals(MAGIC, magic)) {
+      throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
+    }
+    long footerIndex = footerLengthIndex - footerLength;
+    if (Log.DEBUG) {
+      LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
+    }
+    if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) {
+      throw new RuntimeException("corrupted file: the footer index is not within the file");
+    }
+    f.seek(footerIndex);
+    return converter.readParquetMetadata(f, filter);
+  }
+
+  public static ParquetFileReader open(Configuration conf, Path file) throws IOException {
+    return new ParquetFileReader(conf, file);
+  }
+
+  public static ParquetFileReader open(Configuration conf, Path file, MetadataFilter filter) throws IOException {
+    return new ParquetFileReader(conf, file, filter);
+  }
+
+  public static ParquetFileReader open(Configuration conf, Path file, ParquetMetadata footer) throws IOException {
+    return new ParquetFileReader(conf, file, footer);
   }
 
   private final CodecFactory codecFactory;
-  private final List<BlockMetaData> blocks;
   private final FSDataInputStream f;
-  private final Path filePath;
+  private final FileStatus fileStatus;
   private final Map<ColumnPath, ColumnDescriptor> paths = new HashMap<ColumnPath, ColumnDescriptor>();
-  private final FileMetaData fileMetaData;
-  private final String createdBy;
+  private final FileMetaData fileMetaData; // may be null
   private final ByteBufferAllocator allocator;
+  private final Configuration conf;
+
+  // not final. in some cases, this may be lazily loaded for backward-compat.
+  private ParquetMetadata footer;
+  // blocks can be filtered after they are read (or set in the constructor)
+  private List<BlockMetaData> blocks;
 
   private int currentBlock = 0;
+  private ColumnChunkPageReadStore currentRowGroup = null;
+  private DictionaryPageReader nextDictionaryReader = null;
 
   /**
    * @deprecated use @link{ParquetFileReader(Configuration configuration, FileMetaData fileMetaData,
@@ -490,14 +512,15 @@ public class ParquetFileReader implements Closeable {
    * @param columns the columns to read (their path)
    * @throws IOException if the file can not be opened
    */
+  @Deprecated
   public ParquetFileReader(
       Configuration configuration, FileMetaData fileMetaData,
       Path filePath, List<BlockMetaData> blocks, List<ColumnDescriptor> columns) throws IOException {
-    this.filePath = filePath;
+    this.conf = configuration;
     this.fileMetaData = fileMetaData;
-    this.createdBy = fileMetaData == null ? null : fileMetaData.getCreatedBy();
     FileSystem fs = filePath.getFileSystem(configuration);
     this.f = fs.open(filePath);
+    this.fileStatus = fs.getFileStatus(filePath);
     this.blocks = blocks;
     for (ColumnDescriptor col : columns) {
       paths.put(ColumnPath.get(col.getPath()), col);
@@ -508,6 +531,111 @@ public class ParquetFileReader implements Closeable {
     this.allocator = new HeapByteBufferAllocator();
   }
 
+  /**
+   * @param configuration the Hadoop Configuration
+   * @param file Path to a parquet file
+   * @throws IOException if the file can not be opened
+   */
+  private ParquetFileReader(Configuration configuration, Path file) throws IOException {
+    this(configuration, file, NO_FILTER);
+  }
+
+  /**
+   * @param conf the Hadoop Configuration
+   * @param file Path to a parquet file
+   * @param filter a {@link MetadataFilter} for selecting row groups
+   * @throws IOException if the file can not be opened
+   */
+  public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) throws IOException {
+    this.conf = conf;
+    FileSystem fs = file.getFileSystem(conf);
+    this.fileStatus = fs.getFileStatus(file);
+    this.f = fs.open(file);
+    this.footer = readFooter(fileStatus, f, filter);
+    this.fileMetaData = footer.getFileMetaData();
+    this.blocks = footer.getBlocks();
+    for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
+      paths.put(ColumnPath.get(col.getPath()), col);
+    }
+    // the page size parameter isn't meaningful when only using
+    // the codec factory to get decompressors
+    this.codecFactory = new CodecFactory(conf, 0);
+    this.allocator = new HeapByteBufferAllocator();
+  }
+
+  /**
+   * @param conf the Hadoop Configuration
+   * @param file Path to a parquet file
+   * @param footer a {@link ParquetMetadata} footer already read from the file
+   * @throws IOException if the file can not be opened
+   */
+  public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) throws IOException {
+    this.conf = conf;
+    FileSystem fs = file.getFileSystem(conf);
+    this.fileStatus = fs.getFileStatus(file);
+    this.f = fs.open(file);
+    this.footer = footer;
+    this.fileMetaData = footer.getFileMetaData();
+    this.blocks = footer.getBlocks();
+    for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
+      paths.put(ColumnPath.get(col.getPath()), col);
+    }
+    // the page size parameter isn't meaningful when only using
+    // the codec factory to get decompressors
+    this.codecFactory = new CodecFactory(conf, 0);
+    this.allocator = new HeapByteBufferAllocator();
+  }
+
+  public ParquetMetadata getFooter() {
+    if (footer == null) {
+      try {
+        // don't read the row groups because this.blocks is always set
+        this.footer = readFooter(fileStatus, f, SKIP_ROW_GROUPS);
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Unable to read file footer", e);
+      }
+    }
+    return footer;
+  }
+
+  public FileMetaData getFileMetaData() {
+    if (fileMetaData != null) {
+      return fileMetaData;
+    }
+    return getFooter().getFileMetaData();
+  }
+
+  public long getRecordCount() {
+    long total = 0;
+    for (BlockMetaData block : blocks) {
+      total += block.getRowCount();
+    }
+    return total;
+  }
+
+  public Path getPath() {
+    return fileStatus.getPath();
+  }
+
+  void filterRowGroups(FilterCompat.Filter filter) throws IOException {
+    // set up data filters based on configured levels
+    List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>();
+
+    if (conf.getBoolean("parquet.filter.statistics.enabled", true)) {
+      levels.add(STATISTICS);
+    }
+
+    if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) {
+      levels.add(DICTIONARY);
+    }
+
+    this.blocks = RowGroupFilter.filterRowGroups(levels, filter, blocks, this);
+  }
+
+  public List<BlockMetaData> getRowGroups() {
+    return blocks;
+  }
+
   public void appendTo(ParquetFileWriter writer) throws IOException {
     writer.appendRowGroups(f, blocks, true);
   }
@@ -525,7 +653,7 @@ public class ParquetFileReader implements Closeable {
     if (block.getRowCount() == 0) {
       throw new RuntimeException("Illegal row group of 0 rows");
     }
-    ColumnChunkPageReadStore columnChunkPageReadStore = new ColumnChunkPageReadStore(block.getRowCount());
+    this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount());
     // prepare the list of consecutive chunks to read them in one scan
     List<ConsecutiveChunkList> allChunks = new ArrayList<ConsecutiveChunkList>();
     ConsecutiveChunkList currentChunks = null;
@@ -547,19 +675,113 @@ public class ParquetFileReader implements Closeable {
     for (ConsecutiveChunkList consecutiveChunks : allChunks) {
       final List<Chunk> chunks = consecutiveChunks.readAll(f);
       for (Chunk chunk : chunks) {
-        columnChunkPageReadStore.addColumn(chunk.descriptor.col, chunk.readAllPages());
+        currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
       }
     }
+
+    // avoid re-reading bytes the dictionary reader is used after this call
+    if (nextDictionaryReader != null) {
+      nextDictionaryReader.setRowGroup(currentRowGroup);
+    }
+
+    advanceToNextBlock();
+
+    return currentRowGroup;
+  }
+
+  public boolean skipNextRowGroup() {
+    return advanceToNextBlock();
+  }
+
+  private boolean advanceToNextBlock() {
+    if (currentBlock == blocks.size()) {
+      return false;
+    }
+
+    // update the current block and instantiate a dictionary reader for it
     ++currentBlock;
-    return columnChunkPageReadStore;
+    this.nextDictionaryReader = null;
+
+    return true;
   }
 
+  /**
+   * Returns a {@link DictionaryPageReadStore} for the row group that would be
+   * returned by calling {@link #readNextRowGroup()} or skipped by calling
+   * {@link #skipNextRowGroup()}.
+   *
+   * @return a DictionaryPageReadStore for the next row group
+   */
+  public DictionaryPageReadStore getNextDictionaryReader() {
+    if (nextDictionaryReader == null && currentBlock < blocks.size()) {
+      this.nextDictionaryReader = getDictionaryReader(blocks.get(currentBlock));
+    }
+    return nextDictionaryReader;
+  }
+
+  public DictionaryPageReader getDictionaryReader(BlockMetaData block) {
+    return new DictionaryPageReader(this, block);
+  }
+
+  /**
+   * Reads and decompresses a dictionary page for the given column chunk.
+   *
+   * Returns null if the given column chunk has no dictionary page.
+   *
+   * @param meta a column's ColumnChunkMetaData to read the dictionary from
+   * @return an uncompressed DictionaryPage or null
+   * @throws IOException
+   */
+  DictionaryPage readDictionary(ColumnChunkMetaData meta) throws IOException {
+    if (!meta.getEncodings().contains(Encoding.PLAIN_DICTIONARY) &&
+        !meta.getEncodings().contains(Encoding.RLE_DICTIONARY)) {
+      return null;
+    }
+
+    // TODO: this should use getDictionaryPageOffset() but it isn't reliable.
+    if (f.getPos() != meta.getStartingPos()) {
+      f.seek(meta.getStartingPos());
+    }
+
+    PageHeader pageHeader = Util.readPageHeader(f);
+    if (!pageHeader.isSetDictionary_page_header()) {
+      return null; // TODO: should this complain?
+    }
+
+    DictionaryPage compressedPage = readCompressedDictionary(pageHeader, f);
+    BytesDecompressor decompressor = codecFactory.getDecompressor(meta.getCodec());
+
+    return new DictionaryPage(
+        decompressor.decompress(compressedPage.getBytes(), compressedPage.getUncompressedSize()),
+        compressedPage.getDictionarySize(),
+        compressedPage.getEncoding());
+  }
+
+  private static DictionaryPage readCompressedDictionary(
+      PageHeader pageHeader, FSDataInputStream fin) throws IOException {
+    DictionaryPageHeader dictHeader = pageHeader.getDictionary_page_header();
 
+    int uncompressedPageSize = pageHeader.getUncompressed_page_size();
+    int compressedPageSize = pageHeader.getCompressed_page_size();
+
+    byte [] dictPageBytes = new byte[compressedPageSize];
+    fin.readFully(dictPageBytes);
+
+    BytesInput bin = BytesInput.from(dictPageBytes);
+
+    return new DictionaryPage(
+        bin, uncompressedPageSize, dictHeader.getNum_values(),
+        converter.getEncoding(dictHeader.getEncoding()));
+  }
 
   @Override
   public void close() throws IOException {
-    f.close();
-    this.codecFactory.release();
+    if (f != null) {
+      f.close();
+    }
+    if (codecFactory != null) {
+      codecFactory.release();
+    }
   }
 
   /**
@@ -622,7 +844,7 @@ public class ParquetFileReader implements Closeable {
                     dataHeaderV1.getNum_values(),
                     uncompressedPageSize,
                     fromParquetStatistics(
-                        createdBy,
+                        getFileMetaData().getCreatedBy(),
                         dataHeaderV1.getStatistics(),
                         descriptor.col.getType()),
                     converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
@@ -645,7 +867,7 @@ public class ParquetFileReader implements Closeable {
                     this.readAsBytesInput(dataSize),
                     uncompressedPageSize,
                     fromParquetStatistics(
-                        createdBy,
+                        getFileMetaData().getCreatedBy(),
                         dataHeaderV2.getStatistics(),
                         descriptor.col.getType()),
                     dataHeaderV2.isIs_compressed()
@@ -664,7 +886,7 @@ public class ParquetFileReader implements Closeable {
         // Would be nice to have a CorruptParquetFileException or something as a subclass?
         throw new IOException(
             "Expected " + descriptor.metadata.getValueCount() + " values in column chunk at " +
-            filePath + " offset " + descriptor.metadata.getFirstDataPageOffset() +
+            getPath() + " offset " + descriptor.metadata.getFirstDataPageOffset() +
             " but got " + valuesCountReadSoFar + " values instead over " + pagesInChunk.size()
             + " pages ending at file offset " + (descriptor.fileOffset + pos()));
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index 7cbb04a..ff9c811 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -35,11 +35,8 @@ import org.apache.parquet.Preconditions;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
-import org.apache.parquet.filter2.compat.RowGroupFilter;
 import org.apache.parquet.hadoop.api.ReadSupport;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.util.HiddenFileFilter;
-import org.apache.parquet.schema.MessageType;
 
 /**
  * Read records from a Parquet file.
@@ -141,17 +138,15 @@ public class ParquetReader<T> implements Closeable {
     if (footersIterator.hasNext()) {
       Footer footer = footersIterator.next();
 
-      List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
+      ParquetFileReader fileReader = ParquetFileReader.open(
+          conf, footer.getFile(), footer.getParquetMetadata());
 
-      MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();
-
-      List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
-          filter, blocks, fileSchema);
+      // apply data filters
+      fileReader.filterRowGroups(filter);
 
       reader = new InternalParquetRecordReader<T>(readSupport, filter);
-      reader.initialize(fileSchema,
-          footer.getParquetMetadata().getFileMetaData(),
-          footer.getFile(), filteredBlocks, conf);
+
+      reader.initialize(fileReader, conf);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
index 1558fc0..eae3b4e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java
@@ -18,10 +18,9 @@
  */
 package org.apache.parquet.hadoop;
 
-import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
-import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel.*;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.offsets;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
-import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
 import static org.apache.parquet.hadoop.ParquetInputFormat.SPLIT_FILES;
 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 
@@ -47,15 +46,15 @@ import org.apache.parquet.column.Encoding;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
+import org.apache.parquet.filter2.compat.RowGroupFilter.FilterLevel;
+import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
 import org.apache.parquet.hadoop.api.ReadSupport;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ContextUtil;
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
 import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.schema.MessageType;
 
 /**
  * Reads the records from a block of a Parquet file
@@ -154,52 +153,38 @@ public class ParquetRecordReader<T> extends RecordReader<Void, T> {
   private void initializeInternalReader(ParquetInputSplit split, Configuration configuration) throws IOException {
     Path path = split.getPath();
     long[] rowGroupOffsets = split.getRowGroupOffsets();
-    List<BlockMetaData> filteredBlocks;
-    ParquetMetadata footer;
+
     // if task.side.metadata is set, rowGroupOffsets is null
-    if (rowGroupOffsets == null) {
-      // then we need to apply the predicate push down filter
-      footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
-      MessageType fileSchema = footer.getFileMetaData().getSchema();
-      Filter filter = getFilter(configuration);
-      filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
-    } else {
-      // otherwise we find the row groups that were selected on the client
-      footer = readFooter(configuration, path, NO_FILTER);
-      Set<Long> offsets = new HashSet<Long>();
-      for (long offset : rowGroupOffsets) {
-        offsets.add(offset);
-      }
-      filteredBlocks = new ArrayList<BlockMetaData>();
-      for (BlockMetaData block : footer.getBlocks()) {
-        if (offsets.contains(block.getStartingPos())) {
-          filteredBlocks.add(block);
-        }
-      }
-      // verify we found them all
-      if (filteredBlocks.size() != rowGroupOffsets.length) {
-        long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
-        for (int i = 0; i < foundRowGroupOffsets.length; i++) {
-          foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
-        }
-        // this should never happen.
-        // provide a good error message in case there's a bug
+    MetadataFilter metadataFilter = (rowGroupOffsets != null ?
+        offsets(rowGroupOffsets) :
+        range(split.getStart(), split.getEnd()));
+
+    // open a reader with the metadata filter
+    ParquetFileReader reader = ParquetFileReader.open(
+        configuration, path, metadataFilter);
+
+    if (rowGroupOffsets != null) {
+      // verify a row group was found for each offset
+      List<BlockMetaData> blocks = reader.getFooter().getBlocks();
+      if (blocks.size() != rowGroupOffsets.length) {
         throw new IllegalStateException(
-            "All the offsets listed in the split should be found in the file."
+            "All of the offsets in the split should be found in the file."
             + " expected: " + Arrays.toString(rowGroupOffsets)
-            + " found: " + filteredBlocks
-            + " out of: " + Arrays.toString(foundRowGroupOffsets)
-            + " in range " + split.getStart() + ", " + split.getEnd());
+            + " found: " + blocks);
       }
+
+    } else {
+      // apply data filters
+      reader.filterRowGroups(getFilter(configuration));
     }
 
-    if (!filteredBlocks.isEmpty()) {
-      checkDeltaByteArrayProblem(footer.getFileMetaData(), configuration, filteredBlocks.get(0));
+    if (!reader.getRowGroups().isEmpty()) {
+      checkDeltaByteArrayProblem(
+          reader.getFooter().getFileMetaData(), configuration,
+          reader.getRowGroups().get(0));
     }
 
-    MessageType fileSchema = footer.getFileMetaData().getSchema();
-    internalReader.initialize(
-        fileSchema, footer.getFileMetaData(), path, filteredBlocks, configuration);
+    internalReader.initialize(reader, configuration);
   }
 
   private void checkDeltaByteArrayProblem(FileMetaData meta, Configuration conf, BlockMetaData block) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/4b1ff8f4/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
new file mode 100644
index 0000000..754da68
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.filter2.dictionarylevel;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
+import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
+import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
+import org.apache.parquet.filter2.predicate.Operators.IntColumn;
+import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
+import static org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop;
+import static org.apache.parquet.filter2.predicate.FilterApi.*;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+public class DictionaryFilterTest {
+
+  private static final int nElements = 1000;
+  private static final Configuration conf = new Configuration();
+  private static  Path file = new Path("target/test/TestDictionaryFilter/testParquetFile");
+  private static final MessageType schema = parseMessageType(
+      "message test { "
+          + "required binary binary_field; "
+          + "required binary single_value_field; "
+          + "required int32 int32_field; "
+          + "required int64 int64_field; "
+          + "required double double_field; "
+          + "required float float_field; "
+          + "required int32 plain_int32_field; "
+          + "required binary fallback_binary_field; "
+          + "} ");
+
+  private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz";
+  private static final int[] intValues = new int[] {
+      -100, 302, 3333333, 7654321, 1234567, -2000, -77775, 0, 75, 22223,
+      77, 22221, -444443, 205, 12, 44444, 889, 66665, -777889, -7,
+      52, 33, -257, 1111, 775, 26};
+  private static final long[] longValues = new long[] {
+      -100L, 302L, 3333333L, 7654321L, 1234567L, -2000L, -77775L, 0L,
+      75L, 22223L, 77L, 22221L, -444443L, 205L, 12L, 44444L, 889L, 66665L,
+      -777889L, -7L, 52L, 33L, -257L, 1111L, 775L, 26L};
+
+  private static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
+    for (int i = 0; i < nElements; i++) {
+      int index = i % ALPHABET.length();
+
+      Group group = f.newGroup()
+          .append("binary_field", ALPHABET.substring(index, index+1))
+          .append("single_value_field", "sharp")
+          .append("int32_field", intValues[i % intValues.length])
+          .append("int64_field", longValues[i % longValues.length])
+          .append("double_field", toDouble(intValues[i % intValues.length]))
+          .append("float_field", toFloat(intValues[i % intValues.length]))
+          .append("plain_int32_field", i)
+          .append("fallback_binary_field", i < (nElements / 2) ?
+              ALPHABET.substring(index, index+1) : UUID.randomUUID().toString());
+
+      writer.write(group);
+    }
+    writer.close();
+  }
+
+  @BeforeClass
+  public static void prepareFile() throws IOException {
+    cleanup();
+
+    GroupWriteSupport.setSchema(schema, conf);
+    SimpleGroupFactory f = new SimpleGroupFactory(schema);
+    ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+        .withWriterVersion(PARQUET_1_0)
+        .withCompressionCodec(UNCOMPRESSED)
+        .withRowGroupSize(1024*1024)
+        .withPageSize(1024)
+        .enableDictionaryEncoding()
+        .withDictionaryPageSize(2*1024)
+        .withConf(conf)
+        .build();
+    writeData(f, writer);
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    FileSystem fs = file.getFileSystem(conf);
+    if (fs.exists(file)) {
+      fs.delete(file, true);
+    }
+  }
+
+
+  List<ColumnChunkMetaData> ccmd;
+  ParquetFileReader reader;
+  DictionaryPageReadStore dictionaries;
+
+  @Before
+  public void setUp() throws Exception {
+    reader = ParquetFileReader.open(conf, file);
+    ParquetMetadata meta = reader.getFooter();
+    ccmd = meta.getBlocks().get(0).getColumns();
+    dictionaries = reader.getDictionaryReader(meta.getBlocks().get(0));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    reader.close();
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testDictionaryEncodedColumns() throws Exception {
+    Set<String> dictionaryEncodedColumns = new HashSet<String>(Arrays.asList(
+        "binary_field", "single_value_field", "int32_field", "int64_field",
+        "double_field", "float_field"));
+    for (ColumnChunkMetaData column : ccmd) {
+      String name = column.getPath().toDotString();
+      if (dictionaryEncodedColumns.contains(name)) {
+        assertTrue("Column should be dictionary encoded: " + name,
+            column.getEncodings().contains(Encoding.PLAIN_DICTIONARY));
+        assertFalse("Column should not have plain data pages" + name,
+            column.getEncodings().contains(Encoding.PLAIN));
+
+      } else {
+        assertTrue("Column should have plain encoding: " + name,
+            column.getEncodings().contains(Encoding.PLAIN));
+
+        if (name.startsWith("fallback")) {
+          assertTrue("Column should be have some dictionary encoding: " + name,
+              column.getEncodings().contains(Encoding.PLAIN_DICTIONARY));
+        } else {
+          assertFalse("Column should have no dictionary encoding: " + name,
+              column.getEncodings().contains(Encoding.PLAIN_DICTIONARY));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testEqBinary() throws Exception {
+    BinaryColumn b = binaryColumn("binary_field");
+    FilterPredicate pred = eq(b, Binary.fromString("c"));
+
+    assertFalse("Should not drop block for lower case letters",
+        canDrop(pred, ccmd, dictionaries));
+
+    assertTrue("Should drop block for upper case letters",
+        canDrop(eq(b, Binary.fromString("A")), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testNotEqBinary() throws Exception {
+    BinaryColumn sharp = binaryColumn("single_value_field");
+    BinaryColumn b = binaryColumn("binary_field");
+
+    assertTrue("Should drop block with only the excluded value",
+        canDrop(notEq(sharp, Binary.fromString("sharp")), ccmd, dictionaries));
+
+    assertFalse("Should not drop block with any other value",
+        canDrop(notEq(sharp, Binary.fromString("applause")), ccmd, dictionaries));
+
+    assertFalse("Should not drop block with a known value",
+        canDrop(notEq(b, Binary.fromString("x")), ccmd, dictionaries));
+
+    assertFalse("Should not drop block with a known value",
+        canDrop(notEq(b, Binary.fromString("B")), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testLtInt() throws Exception {
+    IntColumn i32 = intColumn("int32_field");
+    int lowest = Integer.MAX_VALUE;
+    for (int value : intValues) {
+      lowest = Math.min(lowest, value);
+    }
+
+    assertTrue("Should drop: < lowest value",
+        canDrop(lt(i32, lowest), ccmd, dictionaries));
+    assertFalse("Should not drop: < (lowest value + 1)",
+        canDrop(lt(i32, lowest + 1), ccmd, dictionaries));
+
+    assertFalse("Should not drop: contains matching values",
+        canDrop(lt(i32, Integer.MAX_VALUE), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testLtEqLong() throws Exception {
+    LongColumn i64 = longColumn("int64_field");
+    long lowest = Long.MAX_VALUE;
+    for (long value : longValues) {
+      lowest = Math.min(lowest, value);
+    }
+
+    assertTrue("Should drop: <= lowest - 1",
+        canDrop(ltEq(i64, lowest - 1), ccmd, dictionaries));
+    assertFalse("Should not drop: <= lowest",
+        canDrop(ltEq(i64, lowest), ccmd, dictionaries));
+
+    assertFalse("Should not drop: contains matching values",
+        canDrop(ltEq(i64, Long.MAX_VALUE), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testGtFloat() throws Exception {
+    FloatColumn f = floatColumn("float_field");
+    float highest = Float.MIN_VALUE;
+    for (int value : intValues) {
+      highest = Math.max(highest, toFloat(value));
+    }
+
+    assertTrue("Should drop: > highest value",
+        canDrop(gt(f, highest), ccmd, dictionaries));
+    assertFalse("Should not drop: > (highest value - 1.0)",
+        canDrop(gt(f, highest - 1.0f), ccmd, dictionaries));
+
+    assertFalse("Should not drop: contains matching values",
+        canDrop(gt(f, Float.MIN_VALUE), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testGtEqDouble() throws Exception {
+    DoubleColumn d = doubleColumn("double_field");
+    double highest = Double.MIN_VALUE;
+    for (int value : intValues) {
+      highest = Math.max(highest, toDouble(value));
+    }
+
+    assertTrue("Should drop: >= highest + 0.00000001",
+        canDrop(gtEq(d, highest + 0.00000001), ccmd, dictionaries));
+    assertFalse("Should not drop: >= highest",
+        canDrop(gtEq(d, highest), ccmd, dictionaries));
+
+    assertFalse("Should not drop: contains matching values",
+        canDrop(gtEq(d, Double.MIN_VALUE), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testAnd() throws Exception {
+    BinaryColumn col = binaryColumn("binary_field");
+
+    // both evaluate to false (no upper-case letters are in the dictionary)
+    FilterPredicate B = eq(col, Binary.fromString("B"));
+    FilterPredicate C = eq(col, Binary.fromString("C"));
+
+    // both evaluate to true (all lower-case letters are in the dictionary)
+    FilterPredicate x = eq(col, Binary.fromString("x"));
+    FilterPredicate y = eq(col, Binary.fromString("y"));
+
+    assertTrue("Should drop when either predicate must be false",
+        canDrop(and(B, y), ccmd, dictionaries));
+    assertTrue("Should drop when either predicate must be false",
+        canDrop(and(x, C), ccmd, dictionaries));
+    assertTrue("Should drop when either predicate must be false",
+        canDrop(and(B, C), ccmd, dictionaries));
+    assertFalse("Should not drop when either predicate could be true",
+        canDrop(and(x, y), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testOr() throws Exception {
+    BinaryColumn col = binaryColumn("binary_field");
+
+    // both evaluate to false (no upper-case letters are in the dictionary)
+    FilterPredicate B = eq(col, Binary.fromString("B"));
+    FilterPredicate C = eq(col, Binary.fromString("C"));
+
+    // both evaluate to true (all lower-case letters are in the dictionary)
+    FilterPredicate x = eq(col, Binary.fromString("x"));
+    FilterPredicate y = eq(col, Binary.fromString("y"));
+
+    assertFalse("Should not drop when one predicate could be true",
+        canDrop(or(B, y), ccmd, dictionaries));
+    assertFalse("Should not drop when one predicate could be true",
+        canDrop(or(x, C), ccmd, dictionaries));
+    assertTrue("Should drop when both predicates must be false",
+        canDrop(or(B, C), ccmd, dictionaries));
+    assertFalse("Should not drop when one predicate could be true",
+        canDrop(or(x, y), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testColumnWithoutDictionary() throws Exception {
+    IntColumn plain = intColumn("plain_int32_field");
+    DictionaryPageReadStore dictionaryStore = mock(DictionaryPageReadStore.class);
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(eq(plain, -10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(lt(plain, -10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(ltEq(plain, -10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(gt(plain, nElements + 10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(gtEq(plain, nElements + 10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(notEq(plain, nElements + 10), ccmd, dictionaryStore));
+
+    verifyZeroInteractions(dictionaryStore);
+  }
+
+  @Test
+  public void testColumnWithDictionaryAndPlainEncodings() throws Exception {
+    IntColumn plain = intColumn("fallback_binary_field");
+    DictionaryPageReadStore dictionaryStore = mock(DictionaryPageReadStore.class);
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(eq(plain, -10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(lt(plain, -10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(ltEq(plain, -10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(gt(plain, nElements + 10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(gtEq(plain, nElements + 10), ccmd, dictionaryStore));
+
+    assertFalse("Should never drop block using plain encoding",
+        canDrop(notEq(plain, nElements + 10), ccmd, dictionaryStore));
+
+    verifyZeroInteractions(dictionaryStore);
+  }
+
+  private static double toDouble(int value) {
+    return (value * 1.0);
+  }
+
+  private static float toFloat(int value) {
+    return (float) (value * 2.0);
+  }
+}


Mime
View raw message