parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-654: Add option to disable record-level filtering.
Date Wed, 13 Jul 2016 21:50:40 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master bd0b5af02 -> e036d60d8


PARQUET-654: Add option to disable record-level filtering.

This can be used by frameworks that use codegen for filtering to avoid
running filters within Parquet.

Author: Ryan Blue <blue@apache.org>

Closes #353 from rdblue/PARQUET-654-add-record-level-filter-option and squashes the following
commits:

b497e7f [Ryan Blue] PARQUET-654: Add option to disable record-level filtering.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/e036d60d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/e036d60d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/e036d60d

Branch: refs/heads/master
Commit: e036d60d8a210d5ac28b2e5c51a45ceb82b58f09
Parents: bd0b5af
Author: Ryan Blue <blue@apache.org>
Authored: Wed Jul 13 14:50:08 2016 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Wed Jul 13 14:50:08 2016 -0700

----------------------------------------------------------------------
 .../hadoop/InternalParquetRecordReader.java       |  8 +++++++-
 .../apache/parquet/hadoop/ParquetFileReader.java  | 10 ++++++++--
 .../apache/parquet/hadoop/ParquetInputFormat.java | 18 ++++++++++++++++++
 3 files changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e036d60d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index f74e57c..d43fd7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -46,6 +46,8 @@ import org.apache.parquet.schema.MessageType;
 import static java.lang.String.format;
 import static org.apache.parquet.Log.DEBUG;
 import static org.apache.parquet.Preconditions.checkNotNull;
+import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED_DEFAULT;
 import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 
 class InternalParquetRecordReader<T> {
@@ -53,6 +55,7 @@ class InternalParquetRecordReader<T> {
 
   private ColumnIOFactory columnIOFactory = null;
   private final Filter filter;
+  private boolean filterRecords = true;
 
   private MessageType requestedSchema;
   private MessageType fileSchema;
@@ -130,7 +133,8 @@ class InternalParquetRecordReader<T> {
       if (Log.INFO) LOG.info("block read in memory in " + timeSpentReading + " ms. row count
= " + pages.getRowCount());
       if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
       MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema,
strictTypeChecking);
-      recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
+      recordReader = columnIO.getRecordReader(pages, recordConverter,
+          filterRecords ? filter : FilterCompat.NOOP);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
       ++ currentBlock;
@@ -173,6 +177,8 @@ class InternalParquetRecordReader<T> {
     this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
     this.total = reader.getRecordCount();
     this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration,
total);
+    this.filterRecords = configuration.getBoolean(
+        RECORD_FILTERING_ENABLED, RECORD_FILTERING_ENABLED_DEFAULT);
     LOG.info("RecordReader initialized will read a total of " + total + " records.");
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e036d60d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 7ac1706..a761f2e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -28,6 +28,10 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromP
 import static org.apache.parquet.hadoop.ParquetFileWriter.MAGIC;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_COMMON_METADATA_FILE;
 import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED_DEFAULT;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED_DEFAULT;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -621,11 +625,13 @@ public class ParquetFileReader implements Closeable {
     // set up data filters based on configured levels
     List<RowGroupFilter.FilterLevel> levels = new ArrayList<RowGroupFilter.FilterLevel>();
 
-    if (conf.getBoolean("parquet.filter.statistics.enabled", true)) {
+    if (conf.getBoolean(
+        STATS_FILTERING_ENABLED, STATS_FILTERING_ENABLED_DEFAULT)) {
       levels.add(STATISTICS);
     }
 
-    if (conf.getBoolean("parquet.filter.dictionary.enabled", false)) {
+    if (conf.getBoolean(
+        DICTIONARY_FILTERING_ENABLED, DICTIONARY_FILTERING_ENABLED_DEFAULT)) {
       levels.add(DICTIONARY);
     }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/e036d60d/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index e3536d7..1fe57f9 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -116,6 +116,24 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void,
T> {
   public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
 
   /**
+   * key to configure whether record-level filtering is enabled
+   */
+  public static final String RECORD_FILTERING_ENABLED = "parquet.filter.record-level.enabled";
+  static final boolean RECORD_FILTERING_ENABLED_DEFAULT = true;
+
+  /**
+   * key to configure whether row group stats filtering is enabled
+   */
+  public static final String STATS_FILTERING_ENABLED = "parquet.filter.stats.enabled";
+  static final boolean STATS_FILTERING_ENABLED_DEFAULT = true;
+
+  /**
+   * key to configure whether row group dictionary filtering is enabled
+   */
+  public static final String DICTIONARY_FILTERING_ENABLED = "parquet.filter.dictionary.enabled";
+  static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = false;
+
+  /**
    * key to turn on or off task side metadata loading (default true)
    * if true then metadata is read on the task side and some tasks may finish immediately.
    * if false metadata is read on the client which is slower if there is a lot of metadata
but tasks will only be spawn if there is work to do.


Mime
View raw message