parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] branch bloom-filter updated: PARQUET-1516: Store Bloom filters near to footer (#608)
Date Tue, 12 Feb 2019 09:28:31 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch bloom-filter
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/bloom-filter by this push:
     new 96c2fef  PARQUET-1516: Store Bloom filters near to footer (#608)
96c2fef is described below

commit 96c2fef80c8d433cf2e247e28f3af07562a8065e
Author: Chen, Junjie <jimmyjchen@tencent.com>
AuthorDate: Tue Feb 12 17:28:25 2019 +0800

    PARQUET-1516: Store Bloom filters near to footer (#608)
    
    * PARQUET-1516: Store Bloom filters near to footer
---
 .../format/converter/ParquetMetadataConverter.java |  2 +-
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  | 10 ++-
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 59 ++++++++++----
 .../hadoop/metadata/ColumnChunkMetaData.java       | 89 +++++-----------------
 .../hadoop/TestColumnChunkPageWriteStore.java      |  1 +
 .../parquet/hadoop/TestParquetFileWriter.java      |  5 +-
 6 files changed, 77 insertions(+), 89 deletions(-)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index fcefe3c..945b83d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1192,12 +1192,12 @@ public class ParquetMetadataConverter {
                   messageType.getType(path.toArray()).asPrimitiveType()),
               metaData.data_page_offset,
               metaData.dictionary_page_offset,
-              metaData.bloom_filter_offset,
               metaData.num_values,
               metaData.total_compressed_size,
               metaData.total_uncompressed_size);
           column.setColumnIndexReference(toColumnIndexReference(columnChunk));
           column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
+          column.setBloomFilterOffset(metaData.bloom_filter_offset);
           // TODO
           // index_page_offset
           // key_value_metadata
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index f87630b..1a607d4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -35,6 +35,8 @@ import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
@@ -50,7 +52,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
 
-  private static final class ColumnChunkPageWriter implements PageWriter {
+  private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter
{
 
     private final ColumnDescriptor path;
     private final BytesCompressor compressor;
@@ -69,6 +71,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
     private Set<Encoding> dlEncodings = new HashSet<Encoding>();
     private List<Encoding> dataEncodings = new ArrayList<Encoding>();
 
+    private BloomFilter bloomFilter;
     private ColumnIndexBuilder columnIndexBuilder;
     private OffsetIndexBuilder offsetIndexBuilder;
     private Statistics totalStatistics;
@@ -227,6 +230,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
           totalStatistics,
           columnIndexBuilder,
           offsetIndexBuilder,
+          bloomFilter,
           rlEncodings,
           dlEncodings,
           dataEncodings);
@@ -267,6 +271,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
       return buf.memUsageString(prefix + " ColumnChunkPageWriter");
     }
 
+    @Override
+    public void writeBloomFilter(BloomFilter bloomFilter) {
+      this.bloomFilter = bloomFilter;
+    }
   }
 
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor,
ColumnChunkPageWriter>();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 1fc2c13..764f519 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -124,6 +124,9 @@ public class ParquetFileWriter {
   private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
   private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
 
+  // The Bloom filters
+  private final List<List<BloomFilter>> bloomFilters = new ArrayList<>();
+
   // row group data
   private BlockMetaData currentBlock; // appended to by endColumn
 
@@ -131,6 +134,9 @@ public class ParquetFileWriter {
   private List<ColumnIndex> currentColumnIndexes;
   private List<OffsetIndex> currentOffsetIndexes;
 
+  // The Bloom filter for the actual block
+  private List<BloomFilter> currentBloomFilters;
+
   // row group data set at the start of a row group
   private long currentRecordCount; // set in startBlock
 
@@ -151,7 +157,6 @@ public class ParquetFileWriter {
   private long currentChunkValueCount;            // set in startColumn
   private long currentChunkFirstDataPage;         // set in startColumn (out.pos())
   private long currentChunkDictionaryPageOffset;  // set in writeDictionaryPage
-  private long currentChunkBloomFilterDataOffset; // set in writeBloomData
 
   // set when end is called
   private ParquetMetadata footer = null;
@@ -354,6 +359,8 @@ public class ParquetFileWriter {
 
     currentColumnIndexes = new ArrayList<>();
     currentOffsetIndexes = new ArrayList<>();
+
+    currentBloomFilters = new ArrayList<>();
   }
 
   /**
@@ -410,16 +417,6 @@ public class ParquetFileWriter {
     currentEncodings.add(dictionaryPage.getEncoding());
   }
 
-  /**
-   * Write a Bloom filter
-   * @param bloomFilter the bloom filter of column values
-   * @throws IOException if there is an error while writing
-   */
-  public void writeBloomFilter(BloomFilter bloomFilter) throws IOException {
-    state = state.write();
-    currentChunkBloomFilterDataOffset = out.getPos();
-    bloomFilter.writeTo(out);
-  }
 
   /**
    * writes a single page
@@ -559,6 +556,14 @@ public class ParquetFileWriter {
   }
 
   /**
+   * Write a Bloom filter
+   * @param bloomFilter the bloom filter of column values
+   */
+  void writeBloomFilter(BloomFilter bloomFilter)  {
+    currentBloomFilters.add(bloomFilter);
+  }
+
+  /**
    * Writes a column chunk at once
    * @param descriptor the descriptor of the column
    * @param valueCount the value count in this column
@@ -570,6 +575,7 @@ public class ParquetFileWriter {
    * @param totalStats accumulated statistics for the column chunk
    * @param columnIndexBuilder the builder object for the column index
    * @param offsetIndexBuilder the builder object for the offset index
+   * @param bloomFilter the bloom filter for this column
    * @param rlEncodings the RL encodings used in this column chunk
    * @param dlEncodings the DL encodings used in this column chunk
    * @param dataEncodings the data encodings used in this column chunk
@@ -585,14 +591,18 @@ public class ParquetFileWriter {
       Statistics<?> totalStats,
       ColumnIndexBuilder columnIndexBuilder,
       OffsetIndexBuilder offsetIndexBuilder,
+      BloomFilter bloomFilter,
       Set<Encoding> rlEncodings,
       Set<Encoding> dlEncodings,
       List<Encoding> dataEncodings) throws IOException {
     startColumn(descriptor, valueCount, compressionCodecName);
 
     state = state.write();
+
     if (dictionaryPage != null) {
       writeDictionaryPage(dictionaryPage);
+    }  else if (bloomFilter != null) {
+      currentBloomFilters.add(bloomFilter);
     }
     LOG.debug("{}: write data pages", out.getPos());
     long headersSize = bytes.size() - compressedTotalPageSize;
@@ -638,7 +648,6 @@ public class ParquetFileWriter {
         currentStatistics,
         currentChunkFirstDataPage,
         currentChunkDictionaryPageOffset,
-        currentChunkBloomFilterDataOffset,
         currentChunkValueCount,
         compressedLength,
         uncompressedLength));
@@ -660,8 +669,10 @@ public class ParquetFileWriter {
     blocks.add(currentBlock);
     columnIndexes.add(currentColumnIndexes);
     offsetIndexes.add(currentOffsetIndexes);
+    bloomFilters.add(currentBloomFilters);
     currentColumnIndexes = null;
     currentOffsetIndexes = null;
+    currentBloomFilters =  null;
     currentBlock = null;
   }
 
@@ -898,7 +909,6 @@ public class ParquetFileWriter {
           chunk.getStatistics(),
           newChunkStart,
           newChunkStart,
-          chunk.getBloomFilterOffset(),
           chunk.getValueCount(),
           chunk.getTotalSize(),
           chunk.getTotalUncompressedSize()));
@@ -958,6 +968,7 @@ public class ParquetFileWriter {
     state = state.end();
     serializeColumnIndexes(columnIndexes, blocks, out);
     serializeOffsetIndexes(offsetIndexes, blocks, out);
+    serializeBloomFilters(bloomFilters, blocks, out);
     LOG.debug("{}: end", out.getPos());
     this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION),
blocks);
     serializeFooter(footer, out);
@@ -1007,6 +1018,28 @@ public class ParquetFileWriter {
     }
   }
 
+  private static void serializeBloomFilters(
+    List<List<BloomFilter>> bloomFilters,
+    List<BlockMetaData> blocks,
+    PositionOutputStream out) throws IOException {
+    LOG.debug("{}: bloom filters", out.getPos());
+    for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      List<BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
+      if (blockBloomFilters.isEmpty()) continue;
+      for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+        BloomFilter bloomFilter = blockBloomFilters.get(cIndex);
+        if (bloomFilter == null) {
+          continue;
+        }
+        ColumnChunkMetaData column = columns.get(cIndex);
+        long offset = out.getPos();
+        column.setBloomFilterOffset(offset);
+        bloomFilter.writeTo(out);
+      }
+    }
+  }
+
   private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws
IOException {
     long footerIndex = out.getPos();
     org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION,
footer);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 3156132..2c24356 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -126,7 +126,6 @@ abstract public class ColumnChunkMetaData {
           statistics,
           firstDataPage,
           dictionaryPageOffset,
-          0,
           valueCount,
           totalSize,
           totalUncompressedSize);
@@ -137,56 +136,12 @@ abstract public class ColumnChunkMetaData {
           statistics,
           firstDataPage,
           dictionaryPageOffset,
-          0,
           valueCount,
           totalSize,
           totalUncompressedSize);
     }
   }
 
-  public static ColumnChunkMetaData get(
-    ColumnPath path,
-    PrimitiveType type,
-    CompressionCodecName codec,
-    EncodingStats encodingStats,
-    Set<Encoding> encodings,
-    Statistics statistics,
-    long firstDataPage,
-    long dictionaryPageOffset,
-    long bloomFilterDataOffset,
-    long valueCount,
-    long totalSize,
-    long totalUncompressedSize) {
-    // to save space we store those always positive longs in ints when they fit.
-    if (positiveLongFitsInAnInt(firstDataPage)
-      && positiveLongFitsInAnInt(dictionaryPageOffset)
-      && positiveLongFitsInAnInt(valueCount)
-      && positiveLongFitsInAnInt(totalSize)
-      && positiveLongFitsInAnInt(totalUncompressedSize)) {
-      return new IntColumnChunkMetaData(
-        path, type, codec,
-        encodingStats, encodings,
-        statistics,
-        firstDataPage,
-        dictionaryPageOffset,
-        bloomFilterDataOffset,
-        valueCount,
-        totalSize,
-        totalUncompressedSize);
-    } else {
-      return new LongColumnChunkMetaData(
-        path, type, codec,
-        encodingStats, encodings,
-        statistics,
-        firstDataPage,
-        dictionaryPageOffset,
-        bloomFilterDataOffset,
-        valueCount,
-        totalSize,
-        totalUncompressedSize);
-    }
-  }
-
   /**
    * @return the offset of the first byte in the chunk
    */
@@ -218,6 +173,8 @@ abstract public class ColumnChunkMetaData {
   private IndexReference columnIndexReference;
   private IndexReference offsetIndexReference;
 
+  private long bloomFilterOffset;
+
   protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
     this(null, columnChunkProperties);
   }
@@ -266,11 +223,6 @@ abstract public class ColumnChunkMetaData {
   abstract public long getDictionaryPageOffset();
 
   /**
-   * @return the location of the bloomFilter filter data if any
-   */
-  abstract public long getBloomFilterOffset();
-
-  /**
    * @return count of values in this block of the column
    */
   abstract public long getValueCount();
@@ -325,6 +277,23 @@ abstract public class ColumnChunkMetaData {
   }
 
   /**
+   * @param bloomFilterOffset
+   *          the reference to the Bloom filter
+   */
+  @Private
+  public void setBloomFilterOffset(long bloomFilterOffset) {
+    this.bloomFilterOffset = bloomFilterOffset;
+  }
+
+  /**
+   * @return the offset to the Bloom filter
+   */
+  @Private
+  public long getBloomFilterOffset() {
+    return bloomFilterOffset;
+  }
+
+  /**
    * @return all the encodings used in this column
    */
   public Set<Encoding> getEncodings() {
@@ -345,7 +314,6 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
 
   private final int firstDataPage;
   private final int dictionaryPageOffset;
-  private final int bloomFilterDataOffset;
   private final int valueCount;
   private final int totalSize;
   private final int totalUncompressedSize;
@@ -372,14 +340,12 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
       Statistics statistics,
       long firstDataPage,
       long dictionaryPageOffset,
-      long bloomFilterDataOffset,
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
     super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings));
     this.firstDataPage = positiveLongToInt(firstDataPage);
     this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset);
-    this.bloomFilterDataOffset = positiveLongToInt(bloomFilterDataOffset);
     this.valueCount = positiveLongToInt(valueCount);
     this.totalSize = positiveLongToInt(totalSize);
     this.totalUncompressedSize = positiveLongToInt(totalUncompressedSize);
@@ -422,13 +388,6 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData {
   }
 
   /**
-   * @return the location of bloom filter if any
-   */
-  public long getBloomFilterOffset() {
-    return intToPositiveLong(bloomFilterDataOffset);
-  }
-
-  /**
    * @return count of values in this block of the column
    */
   public long getValueCount() {
@@ -460,7 +419,6 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
 
   private final long firstDataPageOffset;
   private final long dictionaryPageOffset;
-  private final long bloomFilterDataOffset;
   private final long valueCount;
   private final long totalSize;
   private final long totalUncompressedSize;
@@ -487,14 +445,12 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
       Statistics statistics,
       long firstDataPageOffset,
       long dictionaryPageOffset,
-      long bloomFilterDataOffset,
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
     super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings));
     this.firstDataPageOffset = firstDataPageOffset;
     this.dictionaryPageOffset = dictionaryPageOffset;
-    this.bloomFilterDataOffset = bloomFilterDataOffset;
     this.valueCount = valueCount;
     this.totalSize = totalSize;
     this.totalUncompressedSize = totalUncompressedSize;
@@ -516,13 +472,6 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData {
   }
 
   /**
-   * @return the location of the bloom filter if any
-   */
-  public long getBloomFilterOffset() {
-    return bloomFilterDataOffset;
-  }
-
-  /**
    * @return count of values in this block of the column
    */
   public long getValueCount() {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index c353ee3..fc37717 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -259,6 +259,7 @@ public class TestColumnChunkPageWriteStore {
           same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset
index
           any(),
           any(),
+          any(),
           any());
     }
   }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 0cfb001..71ca5ea 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -239,17 +239,14 @@ public class TestParquetFileWriter {
     w.startColumn(col, 5, CODEC);
     w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
     w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
     BloomFilter bloomData = new BlockSplitBloomFilter(0);
     bloomData.insertHash(bloomData.hash(Binary.fromString("hello")));
     bloomData.insertHash(bloomData.hash(Binary.fromString("world")));
-    long blStarts = w.getPos();
     w.writeBloomFilter(bloomData);
-    w.endColumn();
     w.endBlock();
     w.end(new HashMap<String, String>());
     ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
-    assertEquals("bloomFilter offset",
-      blStarts, readFooter.getBlocks().get(0).getColumns().get(0).getBloomFilterOffset());
     ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(),
path,
       Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
     BloomFilterReader bloomFilterReader =  r.getBloomFilterDataReader(readFooter.getBlocks().get(0));


Mime
View raw message