parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] branch master updated: PARQUET-1414: Limit page size based on maximum row count (#531)
Date Wed, 07 Nov 2018 08:44:11 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e0760a  PARQUET-1414: Limit page size based on maximum row count (#531)
1e0760a is described below

commit 1e0760a1f9c138e3cb66143f1c9fdf8ee2e8eef7
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Wed Nov 7 09:44:06 2018 +0100

    PARQUET-1414: Limit page size based on maximum row count (#531)
---
 .../apache/parquet/column/ParquetProperties.java   | 21 ++++++-
 .../parquet/column/impl/ColumnWriteStoreBase.java  | 15 ++++-
 .../apache/parquet/column/mem/TestMemColumn.java   | 71 ++++++++++++++++++++++
 .../apache/parquet/hadoop/ParquetOutputFormat.java | 15 +++++
 .../org/apache/parquet/hadoop/ParquetWriter.java   | 11 ++++
 5 files changed, 128 insertions(+), 5 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index b173239..41e482c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -48,6 +48,7 @@ public class ParquetProperties {
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
+  public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
 
   public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
 
@@ -85,10 +86,11 @@ public class ParquetProperties {
   private final ByteBufferAllocator allocator;
   private final ValuesWriterFactory valuesWriterFactory;
   private final int columnIndexTruncateLength;
+  private final int pageRowCountLimit;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize,
boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck,
ByteBufferAllocator allocator,
-                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength)
{
+                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength,
int pageRowCountLimit) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -102,6 +104,7 @@ public class ParquetProperties {
 
     this.valuesWriterFactory = writerFactory;
     this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
+    this.pageRowCountLimit = pageRowCountLimit;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -194,6 +197,10 @@ public class ParquetProperties {
     return estimateNextSizeCheck;
   }
 
+  public int getPageRowCountLimit() {
+    return pageRowCountLimit;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -213,18 +220,22 @@ public class ParquetProperties {
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
     private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
+    private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
 
     private Builder() {
     }
 
     private Builder(ParquetProperties toCopy) {
+      this.pageSize = toCopy.pageSizeThreshold;
       this.enableDict = toCopy.enableDictionary;
       this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
       this.writerVersion = toCopy.writerVersion;
       this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
       this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
       this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
+      this.valuesWriterFactory = toCopy.valuesWriterFactory;
       this.allocator = toCopy.allocator;
+      this.pageRowCountLimit = toCopy.pageRowCountLimit;
     }
 
     /**
@@ -313,11 +324,17 @@ public class ParquetProperties {
       return this;
     }
 
+    public Builder withPageRowCountLimit(int rowCount) {
+      Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: "
+ rowCount);
+      pageRowCountLimit = rowCount;
+      return this;
+    }
+
     public ParquetProperties build() {
       ParquetProperties properties =
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
+          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
pageRowCountLimit);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then
pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index 5cd7d87..f79c09d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -67,7 +67,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
 
     this.columns = new TreeMap<>();
 
-    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+    this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());
 
     columnWriterProvider = new ColumnWriterProvider() {
       @Override
@@ -95,7 +95,7 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
     }
     this.columns = unmodifiableMap(mcolumns);
 
-    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+    this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());
 
     columnWriterProvider = new ColumnWriterProvider() {
       @Override
@@ -190,13 +190,17 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
 
   private void sizeCheck() {
     long minRecordToWait = Long.MAX_VALUE;
+    int pageRowCountLimit = props.getPageRowCountLimit();
+    long rowCountForNextRowCountCheck = rowCount + pageRowCountLimit;
     for (ColumnWriterBase writer : columns.values()) {
       long usedMem = writer.getCurrentPageBufferedSize();
       long rows = rowCount - writer.getRowsWrittenSoFar();
       long remainingMem = props.getPageSizeThreshold() - usedMem;
-      if (remainingMem <= thresholdTolerance) {
+      if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) {
         writer.writePage();
         remainingMem = props.getPageSizeThreshold();
+      } else {
+        rowCountForNextRowCountCheck = min(rowCountForNextRowCountCheck, rowCount + (pageRowCountLimit
- rows));
       }
       long rowsToFillPage =
           usedMem == 0 ?
@@ -219,5 +223,10 @@ abstract class ColumnWriteStoreBase implements ColumnWriteStore {
     } else {
       rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
     }
+
+    // Do the check earlier if required to keep the row count limit
+    if (rowCountForNextRowCountCheck < rowCountForNextSizeCheck) {
+      rowCountForNextSizeCheck = rowCountForNextRowCountCheck;
+    }
   }
 }
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index e5db38c..f89d0cb 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -18,19 +18,28 @@
  */
 package org.apache.parquet.column.mem;
 
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReader;
+import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.impl.ColumnReadStoreImpl;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
+import org.apache.parquet.column.impl.ColumnWriteStoreV2;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.page.mem.MemPageStore;
 import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+import org.apache.parquet.schema.Types;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -166,6 +175,68 @@ public class TestMemColumn {
     }
   }
 
+  @Test
+  public void testPageSize() {
+    MessageType schema = Types.buildMessage()
+        .requiredList().requiredElement(BINARY).named("binary_col")
+        .requiredList().requiredElement(INT32).named("int32_col")
+        .named("msg");
+    System.out.println(schema);
+    MemPageStore memPageStore = new MemPageStore(123);
+
+    // Using V2 pages so we have rowCount info
+    ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder()
+        .withPageSize(1024) // Less than 10 records for binary_col
+        .withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing
+        .withPageRowCountLimit(10)
+        .withDictionaryEncoding(false) // Enforce having large binary_col pages
+        .build());
+    ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col",
"list", "element" });
+    ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol);
+    ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list",
"element" });
+    ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col);
+    // Writing 123 records
+    for (int i = 0; i < 123; ++i) {
+      // Writing 10 values per record
+      for (int j = 0; j < 10; ++j) {
+        binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2);
+        int32ColWriter.write(42, j == 0 ? 0 : 2, 2);
+      }
+      writeStore.endRecord();
+    }
+    writeStore.flush();
+
+    // Check that all the binary_col pages are <= 1024 bytes
+    {
+      PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol);
+      assertEquals(1230, binaryColPageReader.getTotalValueCount());
+      int pageCnt = 0;
+      int valueCnt = 0;
+      while (valueCnt < binaryColPageReader.getTotalValueCount()) {
+        DataPage page = binaryColPageReader.readPage();
+        ++pageCnt;
+        valueCnt += page.getValueCount();
+        LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(),
page.getIndexRowCount().get());
+        assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <=
1024);
+      }
+    }
+
+    // Check that all the int32_col pages contain <= 10 rows
+    {
+      PageReader int32ColPageReader = memPageStore.getPageReader(int32Col);
+      assertEquals(1230, int32ColPageReader.getTotalValueCount());
+      int pageCnt = 0;
+      int valueCnt = 0;
+      while (valueCnt < int32ColPageReader.getTotalValueCount()) {
+        DataPage page = int32ColPageReader.readPage();
+        ++pageCnt;
+        valueCnt += page.getValueCount();
+        LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(),
page.getIndexRowCount().get());
+        assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <=
10);
+      }
+    }
+  }
+
   private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
     return new ColumnWriteStoreV1(memPageStore,
         ParquetProperties.builder()
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 0789bf5..04cbd15 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -144,6 +144,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
   public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
+  public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
 
   public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
     String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -325,6 +326,18 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
     return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
   }
 
+  public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
+    setPageRowCountLimit(getConfiguration(jobContext), rowCount);
+  }
+
+  public static void setPageRowCountLimit(Configuration conf, int rowCount) {
+    conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount);
+  }
+
+  private static int getPageRowCountLimit(Configuration conf) {
+    return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT);
+  }
+
   private WriteSupport<T> writeSupport;
   private ParquetOutputCommitter committer;
 
@@ -380,6 +393,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
         .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
+        .withPageRowCountLimit(getPageRowCountLimit(conf))
         .build();
 
     long blockSize = getLongBlockSize(conf);
@@ -398,6 +412,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void,
T> {
       LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
       LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
+      LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
     }
 
     WriteContext init = writeSupport.init(conf);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 5b0e4f8..1ed5e32 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -426,6 +426,17 @@ public class ParquetWriter<T> implements Closeable {
     }
 
     /**
+     * Sets the Parquet format page row count limit used by the constructed writer.
+     *
+     * @param rowCount limit for the number of rows stored in a page
+     * @return this builder for method chaining
+     */
+    public SELF withPageRowCountLimit(int rowCount) {
+      encodingPropsBuilder.withPageRowCountLimit(rowCount);
+      return self();
+    }
+
+    /**
      * Set the Parquet format dictionary page size used by the constructed
      * writer.
      *


Mime
View raw message