parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [parquet-mr] Diff for: [GitHub] cjjnjust closed pull request #521: PARQUET-1328: Add Bloom filter reader and writer
Date Mon, 14 Jan 2019 08:47:33 GMT
diff --git a/.travis.yml b/.travis.yml
index e4e623f03..17d7ee7db 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -29,6 +29,10 @@ before_install:
   - sudo make install
   - cd ..
   - date
+  - git clone https://github.com/apache/parquet-format.git
+  - cd parquet-format
+  - mvn install -DskipTests
+  - cd ..
 
 env:
   - HADOOP_PROFILE=default TEST_CODECS=uncompressed,brotli
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index cfdc5553d..83c0118ec 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -93,7 +93,6 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <version>${guava.version}</version>
-      <scope>test</scope>
     </dependency>
   </dependencies>
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 41e482cfd..4df5b7126 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -35,6 +35,8 @@
 import org.apache.parquet.column.values.factory.ValuesWriterFactory;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.HashMap;
+
 /**
  * This class represents all the configurable Parquet properties.
  */
@@ -48,6 +50,8 @@
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
+  public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
+
   public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
 
   public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
@@ -86,11 +90,16 @@ public static WriterVersion fromString(String name) {
   private final ByteBufferAllocator allocator;
   private final ValuesWriterFactory valuesWriterFactory;
   private final int columnIndexTruncateLength;
+  private final boolean enableBloomFilter;
+
+  // The key-value pair represents the column name and its expected distinct number of values in a row group.
+  private final HashMap<String, Long> bloomFilterExpectedDistinctNumbers;
   private final int pageRowCountLimit;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
-                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
+                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
+                            boolean enableBloomFilter, HashMap<String, Long> bloomFilterExpectedDistinctNumber) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -104,6 +113,9 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
 
     this.valuesWriterFactory = writerFactory;
     this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
+
+    this.enableBloomFilter = enableBloomFilter;
+    this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
     this.pageRowCountLimit = pageRowCountLimit;
   }
 
@@ -201,6 +213,14 @@ public int getPageRowCountLimit() {
     return pageRowCountLimit;
   }
 
+  public boolean isBloomFilterEnabled() {
+    return enableBloomFilter;
+  }
+
+  public HashMap<String, Long> getBloomFilterColumnExpectedNDVs() {
+    return bloomFilterExpectedDistinctNumbers;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -220,6 +240,8 @@ public static Builder copy(ParquetProperties toCopy) {
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
     private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+    private boolean enableBloomFilter = DEFAULT_BLOOM_FILTER_ENABLED;
+    private HashMap<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
 
     private Builder() {
@@ -236,6 +258,8 @@ private Builder(ParquetProperties toCopy) {
       this.valuesWriterFactory = toCopy.valuesWriterFactory;
       this.allocator = toCopy.allocator;
       this.pageRowCountLimit = toCopy.pageRowCountLimit;
+      this.enableBloomFilter = toCopy.enableBloomFilter;
+      this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
     }
 
     /**
@@ -324,6 +348,27 @@ public Builder withColumnIndexTruncateLength(int length) {
       return this;
     }
 
+    /**
+     * Set to enable Bloom filter.
+     *
+     * @param enableBloomFilter a boolean to indicate whether to enable Bloom filter.
+     * @return this builder for method chaining.
+     */
+    public Builder withBloomFilterEnabled(boolean enableBloomFilter) {
+      this.enableBloomFilter = enableBloomFilter;
+      return this;
+    }
+    /**
+     * Set Bloom filter info for columns.
+     *
+     * @param columnExpectedNDVs the columns expected number of distinct values in a row group
+     * @return this builder for method chaining
+     */
+    public Builder withBloomFilterInfo(HashMap<String, Long> columnExpectedNDVs) {
+      this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs;
+      return this;
+    }
+
     public Builder withPageRowCountLimit(int rowCount) {
       Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
       pageRowCountLimit = rowCount;
@@ -334,7 +379,8 @@ public ParquetProperties build() {
       ParquetProperties properties =
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
+          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit,
+          enableBloomFilter, bloomFilterColumnExpectedNDVs);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index ac9aaca26..d7019cd76 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -34,6 +34,8 @@
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.schema.MessageType;
 
 /**
@@ -74,7 +76,7 @@
       public ColumnWriter getColumnWriter(ColumnDescriptor path) {
         ColumnWriterBase column = columns.get(path);
         if (column == null) {
-          column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
+          column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
           columns.put(path, column);
         }
         return column;
@@ -91,7 +93,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
     Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
     for (ColumnDescriptor path : schema.getColumns()) {
       PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, createColumnWriter(path, pageWriter, props));
+      mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
     }
     this.columns = unmodifiableMap(mcolumns);
 
@@ -105,7 +107,38 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
     };
   }
 
-  abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
+  // The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
+  ColumnWriteStoreBase(
+    MessageType schema,
+    PageWriteStore pageWriteStore,
+    BloomFilterWriteStore bloomFilterWriteStore,
+    ParquetProperties props) {
+    this.props = props;
+    this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+    Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
+    for (ColumnDescriptor path : schema.getColumns()) {
+      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+      if (props.isBloomFilterEnabled() && props.getBloomFilterColumnExpectedNDVs() != null) {
+        BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
+        mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
+      } else {
+        mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+      }
+    }
+    this.columns = unmodifiableMap(mcolumns);
+
+    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+    columnWriterProvider = new ColumnWriterProvider() {
+      @Override
+      public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+        return columns.get(path);
+      }
+    };
+  }
+
+  abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+                                               BloomFilterWriter bloomFilterWriter, ParquetProperties props);
 
   public ColumnWriter getColumnWriter(ColumnDescriptor path) {
     return columnWriterProvider.getColumnWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index 7258423fb..dd13b0b8a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,10 +22,11 @@
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.schema.MessageType;
 
 public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
-
   public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
     super(schema, pageWriteStore, props);
   }
@@ -36,8 +37,15 @@ public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
     super(pageWriteStore, props);
   }
 
+  public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
+                            BloomFilterWriteStore bloomFilterWriteStore,
+                            ParquetProperties props) {
+    super (schema, pageWriteStore, bloomFilterWriteStore, props);
+  }
+
   @Override
-  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
-    return new ColumnWriterV1(path, pageWriter, props);
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+                                      BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+    return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index bf1090d0b..a9f2d5848 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,16 +22,24 @@
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.schema.MessageType;
 
 public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
-
   public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
     super(schema, pageWriteStore, props);
   }
 
+  public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
+                            BloomFilterWriteStore bloomFilterWriteStore,
+                            ParquetProperties props) {
+    super(schema, pageWriteStore, bloomFilterWriteStore, props);
+  }
+
   @Override
-  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
-    return new ColumnWriterV2(path, pageWriter, props);
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+                                      BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+    return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 3788c82e4..c03b04fc5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.impl;
 
 import java.io.IOException;
+import java.util.HashMap;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
@@ -27,6 +28,9 @@
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
@@ -53,6 +57,9 @@
   private long rowsWrittenSoFar = 0;
   private int pageRowCount;
 
+  private BloomFilterWriter bloomFilterWriter;
+  private BloomFilter bloomFilter;
+
   ColumnWriterBase(
       ColumnDescriptor path,
       PageWriter pageWriter,
@@ -66,6 +73,29 @@
     this.dataColumn = props.newValuesWriter(path);
   }
 
+  ColumnWriterBase(
+    ColumnDescriptor path,
+    PageWriter pageWriter,
+    BloomFilterWriter bloomFilterWriter,
+    ParquetProperties props
+  ) {
+    this(path, pageWriter, props);
+
+    // Bloom filters don't support nested columns yet; see PARQUET-1453.
+    if (path.getPath().length != 1 || bloomFilterWriter == null) {
+      return;
+    }
+
+    this.bloomFilterWriter = bloomFilterWriter;
+    HashMap<String, Long> bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs();
+    String column = path.getPath()[0];
+    if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
+      int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
+        BlockSplitBloomFilter.DEFAULT_FPP);
+      this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits/8);
+    }
+  }
+
   abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path);
 
   abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path);
@@ -122,6 +152,36 @@ public long getBufferedSizeInMemory() {
         + pageWriter.getMemSize();
   }
 
+  private void updateBloomFilter(int value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(long value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(double value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(float value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(Binary value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
   /**
    * Writes the current value
    *
@@ -137,6 +197,7 @@ public void write(double value, int repetitionLevel, int definitionLevel) {
     definitionLevel(definitionLevel);
     dataColumn.writeDouble(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -155,6 +216,7 @@ public void write(float value, int repetitionLevel, int definitionLevel) {
     definitionLevel(definitionLevel);
     dataColumn.writeFloat(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -173,6 +235,7 @@ public void write(Binary value, int repetitionLevel, int definitionLevel) {
     definitionLevel(definitionLevel);
     dataColumn.writeBytes(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -209,6 +272,7 @@ public void write(int value, int repetitionLevel, int definitionLevel) {
     definitionLevel(definitionLevel);
     dataColumn.writeInteger(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -227,6 +291,7 @@ public void write(long value, int repetitionLevel, int definitionLevel) {
     definitionLevel(definitionLevel);
     dataColumn.writeLong(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -246,6 +311,10 @@ void finalizeColumnChunk() {
       }
       dataColumn.resetDictionary();
     }
+
+    if (bloomFilterWriter != null && bloomFilter != null) {
+      bloomFilterWriter.writeBloomFilter(bloomFilter);
+    }
   }
 
   /**
@@ -265,20 +334,24 @@ long getCurrentPageBufferedSize() {
    * @return the number of bytes of memory used to buffer the current data and the previously written pages
    */
   long getTotalBufferedSize() {
+    long bloomBufferSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
     return repetitionLevelColumn.getBufferedSize()
         + definitionLevelColumn.getBufferedSize()
         + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
+        + pageWriter.getMemSize()
+        + bloomBufferSize;
   }
 
   /**
    * @return actual memory used
    */
   long allocatedSize() {
+    long bloomAllocatedSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
     return repetitionLevelColumn.getAllocatedSize()
         + definitionLevelColumn.getAllocatedSize()
         + dataColumn.getAllocatedSize()
-        + pageWriter.allocatedSize();
+        + pageWriter.allocatedSize()
+        + bloomAllocatedSize;
   }
 
   /**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index 646e31aa7..1d732b837 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,16 +27,21 @@
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 
 /**
  * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
  */
 final class ColumnWriterV1 extends ColumnWriterBase {
-
   ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
     super(path, pageWriter, props);
   }
 
+  public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
+                        BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+    super(path, pageWriter, bloomFilterWriter, props);
+  }
+
   @Override
   ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
     return props.newRepetitionLevelWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index 04076c96b..8e9e6f7fa 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,6 +29,7 @@
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
@@ -54,12 +55,18 @@ public BytesInput getBytes() {
     }
   }
 
-  private static final ValuesWriter NULL_WRITER = new DevNullValuesWriter();
 
   ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
     super(path, pageWriter, props);
   }
 
+  private static final ValuesWriter NULL_WRITER = new DevNullValuesWriter();
+
+  ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter,
+                 ParquetProperties props) {
+    super(path, pageWriter, bloomFilterWriter, props);
+  }
+
   @Override
   ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
     return path.getMaxRepetitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newRepetitionLevelEncoder(path));
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
new file mode 100644
index 000000000..b6378976c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.api.Binary;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+
+/*
+ * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s
+ * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny
+ * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in
+ * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD
+ * instruction.
+ */
+public class BlockSplitBloomFilter implements BloomFilter {
+  // Bytes in a tiny Bloom filter block.
+  private static final int BYTES_PER_BLOCK = 32;
+
+  // Default seed for the hash function. It comes from System.nanoTime().
+  private static final int DEFAULT_SEED = 1361930890;
+
+  // Minimum Bloom filter size, set to the size of a tiny Bloom filter block
+  public static final int MINIMUM_BYTES = 32;
+
+  // Maximum Bloom filter size, set to the default HDFS block size for upper boundary check
+  // This should be re-consider when implementing write side logic.
+  public static final int MAXIMUM_BYTES = 128 * 1024 * 1024;
+
+  // The number of bits to set in a tiny Bloom filter
+  private static final int BITS_SET_PER_BLOCK = 8;
+
+  // The metadata in the header of a serialized Bloom filter is three four-byte values: the number of bytes,
+  // the filter algorithm, and the hash algorithm.
+  public static final int HEADER_SIZE = 12;
+
+  // The default false positive probability value
+  public static final double DEFAULT_FPP = 0.01;
+
+  // Hash strategy used in this Bloom filter.
+  public final HashStrategy hashStrategy;
+
+  // The underlying byte array for Bloom filter bitset.
+  private byte[] bitset;
+
+  // A integer array buffer of underlying bitset to help setting bits.
+  private IntBuffer intBuffer;
+
+  // Hash function use to compute hash for column value.
+  private HashFunction hashFunction;
+
+  // The block-based algorithm needs 8 odd SALT values to calculate eight indexes
+  // of bits to set, one per 32-bit word.
+  private static final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+    0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+  /**
+   * Constructor of Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+   *                 of 2. It uses murmur3_x64_128 as its default hash function.
+   */
+  public BlockSplitBloomFilter(int numBytes) {
+    this(numBytes, HashStrategy.MURMUR3_X64_128);
+  }
+
+  /**
+   * Constructor of block-based Bloom filter. It uses murmur3_x64_128 as its default hash
+   * function.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset
+   * @param hashStrategy The hash strategy of Bloom filter.
+   */
+  private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
+    initBitset(numBytes);
+    switch (hashStrategy) {
+      case MURMUR3_X64_128:
+        this.hashStrategy = hashStrategy;
+        hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+  }
+
+  /**
+   * Construct the Bloom filter with given bitset, it is used when reconstructing
+   * Bloom filter from parquet file. It use murmur3_x64_128 as its default hash
+   * function.
+   *
+   * @param bitset The given bitset to construct Bloom filter.
+   */
+  public BlockSplitBloomFilter(byte[] bitset) {
+    this(bitset, HashStrategy.MURMUR3_X64_128);
+  }
+
+  /**
+   * Construct the Bloom filter with given bitset, it is used when reconstructing
+   * Bloom filter from parquet file.
+   *
+   * @param bitset The given bitset to construct Bloom filter.
+   * @param hashStrategy The hash strategy Bloom filter apply.
+   */
+  private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) {
+    if (bitset == null) {
+      throw new RuntimeException("Given bitset is null");
+    }
+
+    this.bitset = bitset;
+    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+    switch (hashStrategy) {
+      case MURMUR3_X64_128:
+        this.hashStrategy = hashStrategy;
+        hashFunction = Hashing.murmur3_128(DEFAULT_SEED);
+        break;
+      default:
+        throw new RuntimeException("Not supported hash strategy");
+    }
+  }
+
+  /**
+   * Create a new bitset for Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [MINIMUM_BYTES, MAXIMUM_BYTES], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range and also will rounded up to a power
+   *                 of 2. It uses murmur3_x64_128 as its default hash function and block-based algorithm
+   *                 as default algorithm.
+   */
+  private void initBitset(int numBytes) {
+    if (numBytes < MINIMUM_BYTES) {
+      numBytes = MINIMUM_BYTES;
+    }
+    // Get next power of 2 if it is not power of 2.
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes) << 1;
+    }
+    if (numBytes > MAXIMUM_BYTES || numBytes < 0) {
+      numBytes = MAXIMUM_BYTES;
+    }
+    this.bitset = new byte[numBytes];
+    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    // Write number of bytes of bitset.
+    out.write(BytesUtils.intToBytes(bitset.length));
+    // Write hash strategy
+    out.write(BytesUtils.intToBytes(hashStrategy.value));
+    // Write algorithm
+    out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value));
+    // Write bitset
+    out.write(bitset);
+  }
+
+  private int[] setMask(int key) {
+    int mask[] = new int[BITS_SET_PER_BLOCK];
+
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = key * SALT[i];
+    }
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = mask[i] >>> 27;
+    }
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = 0x1 << mask[i];
+    }
+
+    return mask;
+  }
+
+  @Override
+  public void insertHash(long hash) {
+    int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+    int key = (int)hash;
+
+    // Calculate mask for bucket.
+    int mask[] = setMask(key);
+    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+      int value = intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i);
+      value |= mask[i];
+      intBuffer.put(bucketIndex * (BYTES_PER_BLOCK / 4) + i, value);
+    }
+  }
+
+  @Override
+  public boolean findHash(long hash) {
+    int bucketIndex = (int)(hash >> 32) & (bitset.length / BYTES_PER_BLOCK - 1);
+    int key = (int)hash;
+
+    // Calculate mask for the tiny Bloom filter.
+    int mask[] = setMask(key);
+    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+      if (0 == (intBuffer.get(bucketIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Calculate optimal size according to the number of distinct values and false positive probability.
+   *
+   * @param n: The number of distinct values.
+   * @param p: The false positive probability.
+   * @return optimal number of bits of given n and p.
+   */
+  public static int optimalNumOfBits(long n, double p) {
+    Preconditions.checkArgument((p > 0.0 && p < 1.0),
+      "FPP should be less than 1.0 and great than 0.0");
+    final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
+    final double MAX = MAXIMUM_BYTES << 3;
+    int numBits = (int)m;
+
+    // Handle overflow.
+    if (m > MAX || m < 0) {
+      numBits = (int)MAX;
+    }
+    // Get next power of 2 if bits is not power of 2.
+    if ((numBits & (numBits - 1)) != 0) {
+      numBits = Integer.highestOneBit(numBits) << 1;
+    }
+    if (numBits < (MINIMUM_BYTES << 3)) {
+      numBits = MINIMUM_BYTES << 3;
+    }
+
+    return numBits;
+  }
+
+  @Override
+  public long hash(int value) {
+    ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  @Override
+  public long hash(long value) {
+    ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  @Override
+  public long hash(double value) {
+    ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  @Override
+  public long hash(float value) {
+    ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
+    return hashFunction.hashBytes(plain.array()).asLong();
+  }
+
+  @Override
+  public long hash(Binary value) {
+    return hashFunction.hashBytes(value.getBytes()).asLong();
+  }
+
+  @Override
+  public long getBitsetSize() {
+    return this.bitset.length;
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
new file mode 100644
index 000000000..3ec192e3e
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.io.api.Binary;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A Bloom filter is a compact structure to indicate whether an item is not in a set or probably
+ * in a set. The Bloom filter usually consists of a bit set that represents a elements set,
+ * a hash strategy and a Bloom filter algorithm.
+ */
+public interface BloomFilter {
+  // Bloom filter Hash strategy.
+  enum HashStrategy {
+    MURMUR3_X64_128(0);
+    HashStrategy(int value) {
+      this.value = value;
+    }
+    int value;
+  }
+
+  // Bloom filter algorithm.
+  enum Algorithm {
+    BLOCK(0);
+    Algorithm(int value) {
+      this.value = value;
+    }
+    int value;
+  }
+
+  /**
+   * Write the Bloom filter to an output stream. It writes the Bloom filter header including the
+   * bitset's length in bytes, the hash strategy, the algorithm, and the bitset.
+   *
+   * @param out the output stream to write
+   */
+  void writeTo(OutputStream out) throws IOException;
+
+  /**
+   * Insert an element to the Bloom filter, the element content is represented by
+   * the hash value of its plain encoding result.
+   *
+   * @param hash the hash result of element.
+   */
+  void insertHash(long hash);
+
+  /**
+   * Determine whether an element is in set or not.
+   *
+   * @param hash the hash value of element plain encoding result.
+   * @return false if element is must not in set, true if element probably in set.
+   */
+  boolean findHash(long hash);
+
+  /**
+   * Compute hash for int value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(int value);
+
+  /**
+   * Compute hash for long value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(long value) ;
+
+  /**
+   * Compute hash for double value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(double value);
+
+  /**
+   * Compute hash for float value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(float value);
+
+  /**
+   * Compute hash for Binary value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(Binary value);
+
+  /**
+   * Get the number of bytes for bitset in this Bloom filter.
+   *
+   * @return The number of bytes for bitset in this Bloom filter.
+   */
+  long getBitsetSize();
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
new file mode 100644
index 000000000..3373bc1a0
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReadStore.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * contains all the bloom filter reader for all columns of a row group
+ */
+public interface BloomFilterReadStore {
+  /**
+   * Get a Bloom filter reader of a column
+   *
+   * @param path the descriptor of the column
+   * @return the corresponding Bloom filter writer
+   */
+  BloomFilterReader getBloomFilterReader(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
new file mode 100644
index 000000000..7a430581d
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterReader.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+public interface BloomFilterReader {
+  /**
+   * Returns a {@link BloomFilter} for the given column descriptor.
+   *
+   * @param path the descriptor of the column
+   * @return the bloomFilter dta for that column, or null if there isn't one
+   */
+  BloomFilter readBloomFilter(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
new file mode 100644
index 000000000..f7e28fdf2
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * Contains all writers for all columns of a row group
+ */
+public interface BloomFilterWriteStore {
+  /**
+   * Get bloom filter writer of a column
+   *
+   * @param path the descriptor for the column
+   * @return the corresponding Bloom filter writer
+   */
+  BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
new file mode 100644
index 000000000..0fab73b2a
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
@@ -0,0 +1,33 @@
+
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+public interface BloomFilterWriter {
+  /**
+   * Write a Bloom filter
+   *
+   * @param bloomFilter the Bloom filter to write
+   *
+   */
+  void writeBloomFilter(BloomFilter bloomFilter);
+}
+
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
new file mode 100644
index 000000000..8dbb0ba19
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.parquet.column.values.RandomStr;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockSplitBloomFilter {
+  @Test
+  public void testConstructor () throws IOException {
+    BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0);
+    assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.MINIMUM_BYTES);
+    BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.MAXIMUM_BYTES + 1);
+    assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.MAXIMUM_BYTES);
+    BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000);
+    assertEquals(bloomFilter3.getBitsetSize(), 1024);
+  }
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  /*
+   * This test is used to test basic operations including inserting, finding and
+   * serializing and de-serializing.
+   */
+  @Test
+  public void testBasic () throws IOException {
+    final String testStrings[] = {"hello", "parquet", "bloom", "filter"};
+    BloomFilter bloomFilter = new BlockSplitBloomFilter(1024);
+
+    for(int i = 0; i < testStrings.length; i++) {
+      bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(testStrings[i])));
+    }
+
+    File testFile = temp.newFile();
+    FileOutputStream fileOutputStream = new FileOutputStream(testFile);
+    bloomFilter.writeTo(fileOutputStream);
+    fileOutputStream.close();
+    FileInputStream fileInputStream = new FileInputStream(testFile);
+
+    byte[] value = new byte[4];
+    fileInputStream.read(value);
+    int length = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(length, 1024);
+
+    fileInputStream.read(value);
+    int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(hash, BloomFilter.HashStrategy.MURMUR3_X64_128.ordinal());
+
+    fileInputStream.read(value);
+    int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal());
+
+    byte[] bitset = new byte[length];
+    fileInputStream.read(bitset);
+    bloomFilter = new BlockSplitBloomFilter(bitset);
+    for(int i = 0; i < testStrings.length; i++) {
+      assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testStrings[i]))));
+    }
+  }
+
+  @Test
+  public void testFPP() throws IOException {
+    final int totalCount = 100000;
+    final double FPP = 0.01;
+    final long SEED = 104729;
+
+    BloomFilter bloomFilter = new BlockSplitBloomFilter(BlockSplitBloomFilter.optimalNumOfBits(totalCount, FPP));
+    List<String> strings = new ArrayList<>();
+    RandomStr randomStr = new RandomStr(new Random(SEED));
+    for(int i = 0; i < totalCount; i++) {
+      String str = randomStr.get(10);
+      strings.add(str);
+      bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(str)));
+    }
+
+    // The exist counts the number of times FindHash returns true.
+    int exist = 0;
+    for (int i = 0; i < totalCount; i++) {
+      String str = randomStr.get(8);
+      if (bloomFilter.findHash(bloomFilter.hash(Binary.fromString(str)))) {
+        exist ++;
+      }
+    }
+
+    // The exist should be probably less than 1000 according FPP 0.01.
+    assertTrue(exist < totalCount * FPP);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index b9c8996f0..2ede5c483 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -457,6 +457,7 @@ private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> rowGrou
           columnMetaData.getTotalSize(),
           columnMetaData.getFirstDataPageOffset());
       columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
+      columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
       if (!columnMetaData.getStatistics().isEmpty()) {
         columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics()));
       }
@@ -1185,6 +1186,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws
                   messageType.getType(path.toArray()).asPrimitiveType()),
               metaData.data_page_offset,
               metaData.dictionary_page_offset,
+              metaData.bloom_filter_offset,
               metaData.num_values,
               metaData.total_compressed_size,
               metaData.total_uncompressed_size);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
new file mode 100644
index 000000000..96e258fe4
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterDataReader.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.parquet.Strings;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.ParquetDecodingException;
+/**
+ * A {@link BloomFilterReader} implementation that reads Bloom filter data from
+ * an open {@link ParquetFileReader}.
+ *
+ */
+public class BloomFilterDataReader implements BloomFilterReader {
+  private final ParquetFileReader reader;
+  private final Map<String, ColumnChunkMetaData> columns;
+  private final Map<String, BloomFilter> cache = new HashMap<>();
+  public BloomFilterDataReader(ParquetFileReader fileReader, BlockMetaData block) {
+    this.reader = fileReader;
+    this.columns = new HashMap<>();
+    for (ColumnChunkMetaData column : block.getColumns()) {
+      columns.put(column.getPath().toDotString(), column);
+    }
+  }
+  @Override
+  public BloomFilter readBloomFilter(ColumnDescriptor descriptor) {
+    String dotPath = Strings.join(descriptor.getPath(), ".");
+    ColumnChunkMetaData column = columns.get(dotPath);
+    if (column == null) {
+      throw new ParquetDecodingException(
+        "Cannot load Bloom filter data, unknown column: " + dotPath);
+    }
+    if (cache.containsKey(dotPath)) {
+      return cache.get(dotPath);
+    }
+    try {
+      synchronized (cache) {
+        if (!cache.containsKey(dotPath)) {
+          BloomFilter bloomFilter = reader.readBloomFilter(column);
+          if (bloomFilter == null) return null;
+          cache.put(dotPath, bloomFilter);
+        }
+      }
+      return cache.get(dotPath);
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+        "Failed to read Bloom data", e);
+    }
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 01867c679..7fe0e410e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -33,6 +33,8 @@
 import java.io.IOException;
 import java.io.SequenceInputStream;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -69,6 +71,8 @@
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1045,6 +1049,46 @@ private DictionaryPage readCompressedDictionary(
         converter.getEncoding(dictHeader.getEncoding()));
   }
 
+  public BloomFilterDataReader getBloomFilterDataReader(BlockMetaData block) {
+    return new BloomFilterDataReader(this, block);
+  }
+
+  /**
+   * Reads Bloom filter data for the given column chunk.
+   *
+   * @param meta a column's ColumnChunkMetaData to read the dictionary from
+   * @return an BloomFilter object.
+   * @throws IOException if there is an error while reading the Bloom filter.
+   */
+  public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
+    long bloomFilterOffset = meta.getBloomFilterOffset();
+    f.seek(bloomFilterOffset);
+
+    // Read Bloom filter data header.
+    byte[] bytes = new byte[BlockSplitBloomFilter.HEADER_SIZE];
+    f.read(bytes);
+    ByteBuffer bloomHeader = ByteBuffer.wrap(bytes);
+    IntBuffer headerBuffer = bloomHeader.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+    int numBytes = headerBuffer.get();
+    if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.MAXIMUM_BYTES) {
+      return null;
+    }
+
+    BloomFilter.HashStrategy hash = BloomFilter.HashStrategy.values()[headerBuffer.get()];
+    if (hash != BlockSplitBloomFilter.HashStrategy.MURMUR3_X64_128) {
+      return null;
+    }
+
+    BloomFilter.Algorithm algorithm = BloomFilter.Algorithm.values()[headerBuffer.get()];
+    if (algorithm != BlockSplitBloomFilter.Algorithm.BLOCK) {
+      return null;
+    }
+
+    byte[] bitset = new byte[numBytes];
+    f.readFully(bitset);
+    return new BlockSplitBloomFilter(bitset);
+  }
+
   /**
    * @param column
    *          the column chunk which the column index is to be returned for
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 20efe4757..1fc2c1360 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -61,6 +61,7 @@
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -150,6 +151,7 @@
   private long currentChunkValueCount;            // set in startColumn
   private long currentChunkFirstDataPage;         // set in startColumn (out.pos())
   private long currentChunkDictionaryPageOffset;  // set in writeDictionaryPage
+  private long currentChunkBloomFilterDataOffset; // set in writeBloomData
 
   // set when end is called
   private ParquetMetadata footer = null;
@@ -408,6 +410,16 @@ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOExceptio
     currentEncodings.add(dictionaryPage.getEncoding());
   }
 
+  /**
+   * Write a Bloom filter
+   * @param bloomFilter the bloom filter of column values
+   * @throws IOException if there is an error while writing
+   */
+  public void writeBloomFilter(BloomFilter bloomFilter) throws IOException {
+    state = state.write();
+    currentChunkBloomFilterDataOffset = out.getPos();
+    bloomFilter.writeTo(out);
+  }
 
   /**
    * writes a single page
@@ -626,6 +638,7 @@ public void endColumn() throws IOException {
         currentStatistics,
         currentChunkFirstDataPage,
         currentChunkDictionaryPageOffset,
+        currentChunkBloomFilterDataOffset,
         currentChunkValueCount,
         compressedLength,
         uncompressedLength));
@@ -885,6 +898,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
           chunk.getStatistics(),
           newChunkStart,
           newChunkStart,
+          chunk.getBloomFilterOffset(),
           chunk.getValueCount(),
           chunk.getTotalSize(),
           chunk.getTotalUncompressedSize()));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index b8fce2f65..4d6f42c2b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -134,6 +134,12 @@
    */
   public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled";
 
+  /**
+   * key to configure whether row group bloom filtering is enabled
+   */
+  public static final String BLOOM_FILTERING_ENABLED = "parquet.filter.bloom.enabled";
+  public static final boolean BLOOM_FILTER_ENABLED_DEFAULT = false;
+
   /**
    * key to turn on or off task side metadata loading (default true)
    * if true then metadata is read on the task side and some tasks may finish immediately.
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 04cbd15c0..33c371537 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
 import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
 
 import java.io.IOException;
+import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -144,6 +145,9 @@
   public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
+  public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names";
+  public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
+  public static final String ENABLE_BLOOM_FILTER = "parquet.enable.bloom.filter";
   public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
 
   public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
@@ -210,6 +214,34 @@ public static boolean getEnableDictionary(JobContext jobContext) {
     return getEnableDictionary(getConfiguration(jobContext));
   }
 
+  public static HashMap<String, Long> getBloomFilterColumnExpectedNDVs(Configuration conf) {
+    HashMap<String, Long> kv = new HashMap<>();
+    String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+    String expectedNDVsConf = conf.get(BLOOM_FILTER_EXPECTED_NDV);
+
+    if (columnNamesConf == null || expectedNDVsConf == null) {
+      return kv;
+    }
+
+    String[] columnNames = columnNamesConf.split(",");
+    String[] expectedNDVs = expectedNDVsConf.split(",");
+
+    if (columnNames.length == expectedNDVs.length) {
+      for (int i = 0; i < columnNames.length; i++) {
+        kv.put(columnNames[i], Long.getLong(expectedNDVs[i]));
+      }
+    } else {
+      LOG.warn("Bloom filter column names are not match expected NDVs");
+    }
+
+    return kv;
+  }
+
+  public static boolean getEnableBloomFilter(Configuration configuration) {
+    return configuration.getBoolean(ENABLE_BLOOM_FILTER,
+      ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED);
+  }
+
   public static int getBlockSize(JobContext jobContext) {
     return getBlockSize(getConfiguration(jobContext));
   }
@@ -388,6 +420,8 @@ private static int getPageRowCountLimit(Configuration conf) {
         .withPageSize(getPageSize(conf))
         .withDictionaryPageSize(getDictionaryPageSize(conf))
         .withDictionaryEncoding(getEnableDictionary(conf))
+        .withBloomFilterEnabled(getEnableBloomFilter(conf))
+        .withBloomFilterInfo(getBloomFilterColumnExpectedNDVs(conf))
         .withWriterVersion(getWriterVersion(conf))
         .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
@@ -412,6 +446,10 @@ private static int getPageRowCountLimit(Configuration conf) {
       LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
       LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
+      LOG.info("Bloom Filter is {}", props.isBloomFilterEnabled()? "on": "off");
+      LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumnExpectedNDVs().keySet());
+      LOG.info("Bloom filter enabled column expected number of distinct values are: {}",
+        props.getBloomFilterColumnExpectedNDVs().values());
       LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
     }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index e6aa1043b..315613253 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -126,6 +126,7 @@ public static ColumnChunkMetaData get(
           statistics,
           firstDataPage,
           dictionaryPageOffset,
+          0,
           valueCount,
           totalSize,
           totalUncompressedSize);
@@ -136,12 +137,56 @@ public static ColumnChunkMetaData get(
           statistics,
           firstDataPage,
           dictionaryPageOffset,
+          0,
           valueCount,
           totalSize,
           totalUncompressedSize);
     }
   }
 
+  public static ColumnChunkMetaData get(
+    ColumnPath path,
+    PrimitiveType type,
+    CompressionCodecName codec,
+    EncodingStats encodingStats,
+    Set<Encoding> encodings,
+    Statistics statistics,
+    long firstDataPage,
+    long dictionaryPageOffset,
+    long bloomFilterDataOffset,
+    long valueCount,
+    long totalSize,
+    long totalUncompressedSize) {
+    // to save space we store those always positive longs in ints when they fit.
+    if (positiveLongFitsInAnInt(firstDataPage)
+      && positiveLongFitsInAnInt(dictionaryPageOffset)
+      && positiveLongFitsInAnInt(valueCount)
+      && positiveLongFitsInAnInt(totalSize)
+      && positiveLongFitsInAnInt(totalUncompressedSize)) {
+      return new IntColumnChunkMetaData(
+        path, type, codec,
+        encodingStats, encodings,
+        statistics,
+        firstDataPage,
+        dictionaryPageOffset,
+        bloomFilterDataOffset,
+        valueCount,
+        totalSize,
+        totalUncompressedSize);
+    } else {
+      return new LongColumnChunkMetaData(
+        path, type, codec,
+        encodingStats, encodings,
+        statistics,
+        firstDataPage,
+        dictionaryPageOffset,
+        bloomFilterDataOffset,
+        valueCount,
+        totalSize,
+        totalUncompressedSize);
+    }
+  }
+
   /**
    * @return the offset of the first byte in the chunk
    */
@@ -220,6 +265,11 @@ public PrimitiveType getPrimitiveType() {
    */
   abstract public long getDictionaryPageOffset();
 
+  /**
+   * @return the location of the bloomFilter filter data if any
+   */
+  abstract public long getBloomFilterOffset();
+
   /**
    * @return count of values in this block of the column
    */
@@ -295,6 +345,7 @@ public String toString() {
 
   private final int firstDataPage;
   private final int dictionaryPageOffset;
+  private final int bloomFilterDataOffset;
   private final int valueCount;
   private final int totalSize;
   private final int totalUncompressedSize;
@@ -321,12 +372,14 @@ public String toString() {
       Statistics statistics,
       long firstDataPage,
       long dictionaryPageOffset,
+      long bloomFilterDataOffset,
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
     super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings));
     this.firstDataPage = positiveLongToInt(firstDataPage);
     this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset);
+    this.bloomFilterDataOffset = positiveLongToInt(bloomFilterDataOffset);
     this.valueCount = positiveLongToInt(valueCount);
     this.totalSize = positiveLongToInt(totalSize);
     this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize);
@@ -368,6 +421,13 @@ public long getDictionaryPageOffset() {
     return intToPositiveLong(dictionaryPageOffset);
   }
 
+  /**
+   * @return the location of bloom filter if any
+   */
+  public long getBloomFilterOffset() {
+    return intToPositiveLong(bloomFilterDataOffset);
+  }
+
   /**
    * @return count of values in this block of the column
    */
@@ -400,6 +460,7 @@ public Statistics getStatistics() {
 
   private final long firstDataPageOffset;
   private final long dictionaryPageOffset;
+  private final long bloomFilterDataOffset;
   private final long valueCount;
   private final long totalSize;
   private final long totalUncompressedSize;
@@ -426,12 +487,14 @@ public Statistics getStatistics() {
       Statistics statistics,
       long firstDataPageOffset,
       long dictionaryPageOffset,
+      long bloomFilterDataOffset,
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
     super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings));
     this.firstDataPageOffset = firstDataPageOffset;
     this.dictionaryPageOffset = dictionaryPageOffset;
+    this.bloomFilterDataOffset = bloomFilterDataOffset;
     this.valueCount = valueCount;
     this.totalSize = totalSize;
     this.totalUncompressedSize = totalUncompressedSize;
@@ -452,6 +515,13 @@ public long getDictionaryPageOffset() {
     return dictionaryPageOffset;
   }
 
+  /**
+   * @return the location of the bloom filter if any
+   */
+  public long getBloomFilterOffset() {
+    return bloomFilterDataOffset;
+  }
+
   /**
    * @return count of values in this block of the column
    */
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 917ad5791..0cfb001d4 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,9 @@
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterReader;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.junit.Assume;
 import org.junit.Rule;
@@ -219,6 +222,42 @@ public void testWriteRead() throws Exception {
     PrintFooter.main(new String[] {path.toString()});
   }
 
+  @Test
+  public void testBloomWriteRead() throws Exception {
+    MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
+    File testFile = temp.newFile();
+    testFile.delete();
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+    configuration.set("parquet.bloomFilter.filter.column.names", "foo");
+    String colPath[] = {"foo"};
+    ColumnDescriptor col = schema.getColumnDescription(colPath);
+    BinaryStatistics stats1 = new BinaryStatistics();
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(col, 5, CODEC);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    BloomFilter bloomData = new BlockSplitBloomFilter(0);
+    bloomData.insertHash(bloomData.hash(Binary.fromString("hello")));
+    bloomData.insertHash(bloomData.hash(Binary.fromString("world")));
+    long blStarts = w.getPos();
+    w.writeBloomFilter(bloomData);
+    w.endColumn();
+    w.endBlock();
+    w.end(new HashMap<String, String>());
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    assertEquals("bloomFilter offset",
+      blStarts, readFooter.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
+    ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+      Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
+    BloomFilterReader bloomFilterReader =  r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+    BloomFilter bloomDataRead = bloomFilterReader.readBloomFilter(col);
+    assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("hello"))));
+    assertTrue(bloomDataRead.findHash(bloomData.hash(Binary.fromString("world"))));
+  }
+
   @Test
   public void testAlignmentWithPadding() throws Exception {
     File testFile = temp.newFile();
diff --git a/pom.xml b/pom.xml
index 9324a1a4b..1189afb5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
     <hadoop1.version>1.2.1</hadoop1.version>
     <cascading.version>2.7.1</cascading.version>
     <cascading3.version>3.1.2</cascading3.version>
-    <parquet.format.version>2.6.0</parquet.format.version>
+    <parquet.format.version>2.7.0-SNAPSHOT</parquet.format.version>
     <previous.version>1.7.0</previous.version>
     <thrift.executable>thrift</thrift.executable>
     <format.thrift.executable>thrift</format.thrift.executable>


With regards,
Apache Git Services

Mime
View raw message