parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-99: Add page size check properties
Date Tue, 08 Dec 2015 22:41:48 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master a24d624aa -> 56326400f


PARQUET-99: Add page size check properties

This adds properties to set the min and max number of records that are passed between page checks, as well as a property that controls whether the next check will be based on records already seen or set to the minimum number of records between checks.

* `parquet.page.size.row.check.min` - minimum number of records between page size checks
* `parquet.page.size.row.check.max` - maximum number of records between page size checks
* `parquet.page.size.check.estimate` - whether to estimate the number of records before the next check, or to always use the minimum number of records.

This also updates the internal API to use ParquetProperties to carry encoding settings (used in parquet-column) to reduce the number of parameters passed through internal APIs. It also adds a builder for ParquetProperties to avoid needing to reference defaults in other modules.

This closes #250

Author: Daniel Weeks <dweeks@netflix.com>
Author: Ryan Blue <blue@apache.org>

Closes #297 from rdblue/parquet-properties-update and squashes the following commits:

c93b73e [Ryan Blue] PARQUET-99: Use ParquetProperties to carry encoding config.
18f8d3a [Daniel Weeks] Spacing
2090719 [Daniel Weeks] Update sizeCheck to write page properly if estimating is turned off
71336ee [Daniel Weeks] Fixed param name
5d99072 [Daniel Weeks] Update page size checking for v2 writer
3f7870c [Daniel Weeks] Rebase to resolve byte buffer conflicts
68794f0 [Daniel Weeks] Merge branch 'master' into page_size_check
b49f03c [Daniel Weeks] Fixed reset of nextSizeCheck
a057f46 [Daniel Weeks] Fixed inverted property logic
e7cd54b [Daniel Weeks] Added property to toggle page size check estimation and initial row size checking


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/56326400
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/56326400
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/56326400

Branch: refs/heads/master
Commit: 56326400fcb5df7bd9336f143f7a3b7d601e5f58
Parents: a24d624
Author: Daniel Weeks <dweeks@netflix.com>
Authored: Tue Dec 8 14:41:38 2015 -0800
Committer: Ryan Blue <blue@apache.org>
Committed: Tue Dec 8 14:41:38 2015 -0800

----------------------------------------------------------------------
 .../parquet/column/ParquetProperties.java       | 243 +++++++++++++++----
 .../parquet/column/impl/ColumnWriteStoreV1.java |  21 +-
 .../parquet/column/impl/ColumnWriteStoreV2.java |  48 ++--
 .../parquet/column/impl/ColumnWriterV1.java     |  48 ++--
 .../parquet/column/impl/ColumnWriterV2.java     |  15 +-
 .../column/impl/TestColumnReaderImpl.java       |  11 +-
 .../column/impl/TestCorruptDeltaByteArrays.java |   6 +-
 .../parquet/column/mem/TestMemColumn.java       |   9 +-
 .../java/org/apache/parquet/io/PerfTest.java    |  10 +-
 .../org/apache/parquet/io/TestColumnIO.java     |  10 +-
 .../org/apache/parquet/io/TestFiltered.java     |  10 +-
 .../hadoop/InternalParquetRecordWriter.java     |  23 +-
 .../parquet/hadoop/ParquetOutputFormat.java     |  69 ++++--
 .../parquet/hadoop/ParquetRecordWriter.java     |  47 +++-
 .../apache/parquet/hadoop/ParquetWriter.java    |  55 ++---
 .../org/apache/parquet/hadoop/TestUtils.java    |   7 +-
 .../parquet/pig/TupleConsumerPerfTest.java      |   9 +-
 .../parquet/thrift/TestParquetReadProtocol.java |  10 +-
 18 files changed, 435 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index f8567a8..0c07d54 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
@@ -44,6 +45,7 @@ import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
 import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
 import org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
 import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.schema.MessageType;
 
@@ -55,6 +57,16 @@ import org.apache.parquet.schema.MessageType;
  */
 public class ParquetProperties {
 
+  public static final int DEFAULT_PAGE_SIZE = 1024 * 1024;
+  public static final int DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
+  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+  public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0;
+  public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
+  public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
+  public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+
+  private static final int MIN_SLAB_SIZE = 64;
+
   public enum WriterVersion {
     PARQUET_1_0 ("v1"),
     PARQUET_2_0 ("v2");
@@ -75,53 +87,83 @@ public class ParquetProperties {
       return WriterVersion.valueOf(name);
     }
   }
+
+  private final int pageSizeThreshold;
   private final int dictionaryPageSizeThreshold;
   private final WriterVersion writerVersion;
   private final boolean enableDictionary;
+  private final int minRowCountForPageSizeCheck;
+  private final int maxRowCountForPageSizeCheck;
+  private final boolean estimateNextSizeCheck;
   private final ByteBufferAllocator allocator;
 
-  public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
-    this(dictPageSize, writerVersion, enableDict, new HeapByteBufferAllocator());
-  }
+  private final int initialSlabSize;
 
-  public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict, ByteBufferAllocator allocator) {
+  private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
+                            int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator) {
+    this.pageSizeThreshold = pageSize;
+    this.initialSlabSize = CapacityByteArrayOutputStream
+        .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
     this.dictionaryPageSizeThreshold = dictPageSize;
     this.writerVersion = writerVersion;
     this.enableDictionary = enableDict;
-    Preconditions.checkNotNull(allocator, "ByteBufferAllocator");
+    this.minRowCountForPageSizeCheck = minRowCountForPageSizeCheck;
+    this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
+    this.estimateNextSizeCheck = estimateNextSizeCheck;
     this.allocator = allocator;
   }
 
-  public ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
+  public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
+    return newColumnDescriptorValuesWriter(path.getMaxRepetitionLevel());
+  }
+
+  public ValuesWriter newDefinitionLevelWriter(ColumnDescriptor path) {
+    return newColumnDescriptorValuesWriter(path.getMaxDefinitionLevel());
+  }
+
+  private ValuesWriter newColumnDescriptorValuesWriter(int maxLevel) {
     if (maxLevel == 0) {
       return new DevNullValuesWriter();
     } else {
       return new RunLengthBitPackingHybridValuesWriter(
-          getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize, this.allocator
-      );
+          getWidthFromMaxInt(maxLevel), MIN_SLAB_SIZE, pageSizeThreshold, allocator);
     }
   }
 
-  private ValuesWriter plainWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+  public RunLengthBitPackingHybridEncoder newRepetitionLevelEncoder(ColumnDescriptor path) {
+    return newLevelEncoder(path.getMaxRepetitionLevel());
+  }
+
+  public RunLengthBitPackingHybridEncoder newDefinitionLevelEncoder(ColumnDescriptor path) {
+    return newLevelEncoder(path.getMaxDefinitionLevel());
+  }
+
+  private RunLengthBitPackingHybridEncoder newLevelEncoder(int maxLevel) {
+    return new RunLengthBitPackingHybridEncoder(
+        getWidthFromMaxInt(maxLevel), MIN_SLAB_SIZE, pageSizeThreshold, allocator);
+  }
+
+  private ValuesWriter plainWriter(ColumnDescriptor path) {
     switch (path.getType()) {
     case BOOLEAN:
       return new BooleanPlainValuesWriter();
     case INT96:
-      return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize, this.allocator);
+      return new FixedLenByteArrayPlainValuesWriter(12, initialSlabSize, pageSizeThreshold, allocator);
     case FIXED_LEN_BYTE_ARRAY:
-      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize, this.allocator);
+      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSlabSize, pageSizeThreshold, allocator);
     case BINARY:
     case INT32:
     case INT64:
     case DOUBLE:
     case FLOAT:
-      return new PlainValuesWriter(initialSizePerCol, pageSize, this.allocator);
+      return new PlainValuesWriter(initialSlabSize, pageSizeThreshold, allocator);
     default:
       throw new IllegalArgumentException("Unknown type " + path.getType());
     }
   }
 
-  private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, int initialSizePerCol) {
+  @SuppressWarnings("deprecation")
+  private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path) {
     Encoding encodingForDataPage;
     Encoding encodingForDictionaryPage;
     switch(writerVersion) {
@@ -158,24 +200,24 @@ public class ParquetProperties {
     }
   }
 
-  private ValuesWriter writerToFallbackTo(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+  private ValuesWriter writerToFallbackTo(ColumnDescriptor path) {
     switch(writerVersion) {
     case PARQUET_1_0:
-      return plainWriter(path, initialSizePerCol, pageSize);
+      return plainWriter(path);
     case PARQUET_2_0:
       switch (path.getType()) {
       case BOOLEAN:
-        return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize, this.allocator);
+        return new RunLengthBitPackingHybridValuesWriter(1, initialSlabSize, pageSizeThreshold, allocator);
       case BINARY:
       case FIXED_LEN_BYTE_ARRAY:
-        return new DeltaByteArrayWriter(initialSizePerCol, pageSize,this.allocator);
+        return new DeltaByteArrayWriter(initialSlabSize, pageSizeThreshold, allocator);
       case INT32:
-        return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize, this.allocator);
+        return new DeltaBinaryPackingValuesWriter(initialSlabSize, pageSizeThreshold, allocator);
       case INT96:
       case INT64:
       case DOUBLE:
       case FLOAT:
-        return plainWriter(path, initialSizePerCol, pageSize);
+        return plainWriter(path);
       default:
         throw new IllegalArgumentException("Unknown type " + path.getType());
       }
@@ -184,27 +226,27 @@ public class ParquetProperties {
     }
   }
 
-  private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
-    ValuesWriter writerToFallBackTo = writerToFallbackTo(path, initialSizePerCol, pageSize);
+  private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path) {
+    ValuesWriter writerToFallBackTo = writerToFallbackTo(path);
     if (enableDictionary) {
       return FallbackValuesWriter.of(
-          dictionaryWriter(path, initialSizePerCol),
+          dictionaryWriter(path),
           writerToFallBackTo);
     } else {
      return writerToFallBackTo;
     }
   }
 
-  public ValuesWriter getValuesWriter(ColumnDescriptor path, int initialSizePerCol, int pageSize) {
+  public ValuesWriter newValuesWriter(ColumnDescriptor path) {
     switch (path.getType()) {
     case BOOLEAN: // no dictionary encoding for boolean
-      return writerToFallbackTo(path, initialSizePerCol, pageSize);
+      return writerToFallbackTo(path);
     case FIXED_LEN_BYTE_ARRAY:
       // dictionary encoding for that type was not enabled in PARQUET 1.0
       if (writerVersion == WriterVersion.PARQUET_2_0) {
-        return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
+        return dictWriterWithFallBack(path);
       } else {
-       return writerToFallbackTo(path, initialSizePerCol, pageSize);
+       return writerToFallbackTo(path);
       }
     case BINARY:
     case INT32:
@@ -212,12 +254,16 @@ public class ParquetProperties {
     case INT96:
     case DOUBLE:
     case FLOAT:
-      return dictWriterWithFallBack(path, initialSizePerCol, pageSize);
+      return dictWriterWithFallBack(path);
     default:
       throw new IllegalArgumentException("Unknown type " + path.getType());
     }
   }
 
+  public int getPageSizeThreshold() {
+    return pageSizeThreshold;
+  }
+
   public int getDictionaryPageSizeThreshold() {
     return dictionaryPageSizeThreshold;
   }
@@ -234,26 +280,139 @@ public class ParquetProperties {
     return allocator;
   }
 
-  public ColumnWriteStore newColumnWriteStore(
-      MessageType schema,
-      PageWriteStore pageStore,
-      int pageSize,
-      ByteBufferAllocator allocator) {
+  public ColumnWriteStore newColumnWriteStore(MessageType schema,
+                                              PageWriteStore pageStore) {
     switch (writerVersion) {
     case PARQUET_1_0:
-      return new ColumnWriteStoreV1(
-          pageStore,
-          pageSize,
-          dictionaryPageSizeThreshold,
-          enableDictionary, writerVersion, allocator);
+      return new ColumnWriteStoreV1(pageStore, this);
     case PARQUET_2_0:
-      return new ColumnWriteStoreV2(
-          schema,
-          pageStore,
-          pageSize,
-          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary, allocator));
+      return new ColumnWriteStoreV2(schema, pageStore, this);
     default:
       throw new IllegalArgumentException("unknown version " + writerVersion);
     }
   }
+
+  public int getMinRowCountForPageSizeCheck() {
+    return minRowCountForPageSizeCheck;
+  }
+
+  public int getMaxRowCountForPageSizeCheck() {
+    return maxRowCountForPageSizeCheck;
+  }
+
+  public boolean estimateNextSizeCheck() {
+    return estimateNextSizeCheck;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static Builder copy(ParquetProperties toCopy) {
+    return new Builder(toCopy);
+  }
+
+  public static class Builder {
+    private int pageSize = DEFAULT_PAGE_SIZE;
+    private int dictPageSize = DEFAULT_DICTIONARY_PAGE_SIZE;
+    private boolean enableDict = DEFAULT_IS_DICTIONARY_ENABLED;
+    private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
+    private int minRowCountForPageSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
+    private int maxRowCountForPageSizeCheck = DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
+    private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
+    private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+
+    private Builder() {
+    }
+
+    private Builder(ParquetProperties toCopy) {
+      this.enableDict = toCopy.enableDictionary;
+      this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
+      this.writerVersion = toCopy.writerVersion;
+      this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
+      this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
+      this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
+      this.allocator = toCopy.allocator;
+    }
+
+    /**
+     * Set the Parquet format page size.
+     *
+     * @param pageSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public Builder withPageSize(int pageSize) {
+      Preconditions.checkArgument(pageSize > 0,
+          "Invalid page size (negative): %s", pageSize);
+      this.pageSize = pageSize;
+      return this;
+    }
+
+    /**
+     * Enable or disable dictionary encoding.
+     *
+     * @param enableDictionary whether dictionary encoding should be enabled
+     * @return this builder for method chaining.
+     */
+    public Builder withDictionaryEncoding(boolean enableDictionary) {
+      this.enableDict = enableDictionary;
+      return this;
+    }
+
+    /**
+     * Set the Parquet format dictionary page size.
+     *
+     * @param dictionaryPageSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public Builder withDictionaryPageSize(int dictionaryPageSize) {
+      Preconditions.checkArgument(dictionaryPageSize > 0,
+          "Invalid dictionary page size (negative): %s", dictionaryPageSize);
+      this.dictPageSize = dictionaryPageSize;
+      return this;
+    }
+
+    /**
+     * Set the {@link WriterVersion format version}.
+     *
+     * @param version a {@code WriterVersion}
+     * @return this builder for method chaining.
+     */
+    public Builder withWriterVersion(WriterVersion version) {
+      this.writerVersion = version;
+      return this;
+    }
+
+    public Builder withMinRowCountForPageSizeCheck(int min) {
+      Preconditions.checkArgument(min > 0,
+          "Invalid row count for page size check (negative): %s", min);
+      this.minRowCountForPageSizeCheck = min;
+      return this;
+    }
+
+    public Builder withMaxRowCountForPageSizeCheck(int max) {
+      Preconditions.checkArgument(max > 0,
+          "Invalid row count for page size check (negative): %s", max);
+      this.maxRowCountForPageSizeCheck = max;
+      return this;
+    }
+
+    // Do not attempt to predict next size check.  Prevents issues with rows that vary significantly in size.
+    public Builder estimateRowCountForPageSizeCheck(boolean estimateNextSizeCheck) {
+      this.estimateNextSizeCheck = estimateNextSizeCheck;
+      return this;
+    }
+
+    public Builder withAllocator(ByteBufferAllocator allocator) {
+      Preconditions.checkNotNull(allocator, "ByteBufferAllocator");
+      this.allocator = allocator;
+      return this;
+    }
+
+    public ParquetProperties build() {
+      return new ParquetProperties(writerVersion, pageSize, dictPageSize,
+          enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
+          estimateNextSizeCheck, allocator);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index 277c468..93a497f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -29,6 +29,7 @@ import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
@@ -37,20 +38,12 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
 
   private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
   private final PageWriteStore pageWriteStore;
-  private final int pageSizeThreshold;
-  private final int dictionaryPageSizeThreshold;
-  private final boolean enableDictionary;
-  private final WriterVersion writerVersion;
-  private final ByteBufferAllocator allocator;
-
-  public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion, ByteBufferAllocator allocator) {
-    super();
+  private final ParquetProperties props;
+
+  public ColumnWriteStoreV1(PageWriteStore pageWriteStore,
+                            ParquetProperties props) {
     this.pageWriteStore = pageWriteStore;
-    this.pageSizeThreshold = pageSizeThreshold;
-    this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
-    this.enableDictionary = enableDictionary;
-    this.writerVersion = writerVersion;
-    this.allocator = allocator;
+    this.props = props;
   }
 
   public ColumnWriter getColumnWriter(ColumnDescriptor path) {
@@ -68,7 +61,7 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
 
   private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
     PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-    return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion, allocator);
+    return new ColumnWriterV1(path, pageWriter, props);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index 4126004..7574ced 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -29,7 +29,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
@@ -40,37 +39,31 @@ import org.apache.parquet.schema.MessageType;
 
 public class ColumnWriteStoreV2 implements ColumnWriteStore {
 
-  // will wait for at least that many records before checking again
-  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
-  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
   // will flush even if size bellow the threshold by this much to facilitate page alignment
   private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %
 
   private final Map<ColumnDescriptor, ColumnWriterV2> columns;
   private final Collection<ColumnWriterV2> writers;
-  private long rowCount;
-  private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+  private final ParquetProperties props;
   private final long thresholdTolerance;
-  private final ByteBufferAllocator allocator;
-
-  private int pageSizeThreshold;
+  private long rowCount;
+  private long rowCountForNextSizeCheck;
 
   public ColumnWriteStoreV2(
       MessageType schema,
       PageWriteStore pageWriteStore,
-      int pageSizeThreshold,
-      ParquetProperties parquetProps) {
-    super();
-    this.pageSizeThreshold = pageSizeThreshold;
-    this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
-    this.allocator = parquetProps.getAllocator();
+      ParquetProperties props) {
+    this.props = props;
+    this.thresholdTolerance = (long)(props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
     Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
     for (ColumnDescriptor path : schema.getColumns()) {
       PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, parquetProps, pageSizeThreshold));
+      mcolumns.put(path, new ColumnWriterV2(path, pageWriter, props));
     }
     this.columns = unmodifiableMap(mcolumns);
     this.writers = this.columns.values();
+
+    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
   }
 
   public ColumnWriter getColumnWriter(ColumnDescriptor path) {
@@ -151,27 +144,32 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
     for (ColumnWriterV2 writer : writers) {
       long usedMem = writer.getCurrentPageBufferedSize();
       long rows = rowCount - writer.getRowsWrittenSoFar();
-      long remainingMem = pageSizeThreshold - usedMem;
+      long remainingMem = props.getPageSizeThreshold() - usedMem;
       if (remainingMem <= thresholdTolerance) {
         writer.writePage(rowCount);
-        remainingMem = pageSizeThreshold;
+        remainingMem = props.getPageSizeThreshold();
       }
       long rowsToFillPage =
           usedMem == 0 ?
-              MAXIMUM_RECORD_COUNT_FOR_CHECK
+              props.getMaxRowCountForPageSizeCheck()
               : (long)((float)rows) / usedMem * remainingMem;
       if (rowsToFillPage < minRecordToWait) {
         minRecordToWait = rowsToFillPage;
       }
     }
     if (minRecordToWait == Long.MAX_VALUE) {
-      minRecordToWait = MINIMUM_RECORD_COUNT_FOR_CHECK;
+      minRecordToWait = props.getMinRowCountForPageSizeCheck();
+    }
+
+    if(props.estimateNextSizeCheck()) {
+      // will check again halfway if between min and max
+      rowCountForNextSizeCheck = rowCount +
+          min(
+              max(minRecordToWait / 2, props.getMinRowCountForPageSizeCheck()),
+              props.getMaxRowCountForPageSizeCheck());
+    } else {
+      rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
     }
-    // will check again halfway
-    rowCountForNextSizeCheck = rowCount +
-        min(
-            max(minRecordToWait / 2, MINIMUM_RECORD_COUNT_FOR_CHECK), // no less than MINIMUM_RECORD_COUNT_FOR_CHECK
-            MAXIMUM_RECORD_COUNT_FOR_CHECK); // no more than MAXIMUM_RECORD_COUNT_FOR_CHECK
   }
 
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index f010df8..dc6ebec 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -23,12 +23,9 @@ import static org.apache.parquet.bytes.BytesInput.concat;
 import java.io.IOException;
 
 import org.apache.parquet.Log;
-import org.apache.parquet.bytes.ByteBufferAllocator;
-import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
@@ -47,12 +44,11 @@ import static java.lang.Math.max;
 final class ColumnWriterV1 implements ColumnWriter {
   private static final Log LOG = Log.getLog(ColumnWriterV1.class);
   private static final boolean DEBUG = Log.DEBUG;
-  private static final int INITIAL_COUNT_FOR_SIZE_CHECK = 100;
-  private static final int MIN_SLAB_SIZE = 64;
 
   private final ColumnDescriptor path;
   private final PageWriter pageWriter;
-  private final long pageSizeThreshold;
+  private final ParquetProperties props;
+
   private ValuesWriter repetitionLevelColumn;
   private ValuesWriter definitionLevelColumn;
   private ValuesWriter dataColumn;
@@ -61,28 +57,20 @@ final class ColumnWriterV1 implements ColumnWriter {
 
   private Statistics statistics;
 
-  public ColumnWriterV1(
-      ColumnDescriptor path,
-      PageWriter pageWriter,
-      int pageSizeThreshold,
-      int dictionaryPageSizeThreshold,
-      boolean enableDictionary,
-      WriterVersion writerVersion,
-      ByteBufferAllocator allocator) {
+  public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
+                        ParquetProperties props) {
     this.path = path;
     this.pageWriter = pageWriter;
-    this.pageSizeThreshold = pageSizeThreshold;
-    // initial check of memory usage. So that we have enough data to make an initial prediction
-    this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
-    resetStatistics();
+    this.props = props;
 
-    ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary, allocator);
+    // initial check of memory usage. So that we have enough data to make an initial prediction
+    this.valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
 
-    this.repetitionLevelColumn = parquetProps.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
-    this.definitionLevelColumn = parquetProps.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
+    resetStatistics();
 
-    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
-    this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
+    this.repetitionLevelColumn = props.newRepetitionLevelWriter(path);
+    this.definitionLevelColumn = props.newDefinitionLevelWriter(path);
+    this.dataColumn = props.newValuesWriter(path);
   }
 
   private void log(Object value, int r, int d) {
@@ -109,13 +97,19 @@ final class ColumnWriterV1 implements ColumnWriter {
       long memSize = repetitionLevelColumn.getBufferedSize()
           + definitionLevelColumn.getBufferedSize()
           + dataColumn.getBufferedSize();
-      if (memSize > pageSizeThreshold) {
+      if (memSize > props.getPageSizeThreshold()) {
         // we will write the current page and check again the size at the predicted middle of next page
-        valueCountForNextSizeCheck = valueCount / 2;
+        if (props.estimateNextSizeCheck()) {
+          valueCountForNextSizeCheck = valueCount / 2;
+        } else {
+          valueCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+        }
         writePage();
-      } else {
+      } else if (props.estimateNextSizeCheck()) {
         // not reached the threshold, will check again midway
-        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * pageSizeThreshold / memSize)) / 2 + 1;
+        valueCountForNextSizeCheck = (int)(valueCount + ((float)valueCount * props.getPageSizeThreshold() / memSize)) / 2 + 1;
+      } else {
+        valueCountForNextSizeCheck += props.getMinRowCountForPageSizeCheck();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index 8249b72..396d53a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -25,7 +25,6 @@ import java.io.IOException;
 
 import org.apache.parquet.Ints;
 import org.apache.parquet.Log;
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -49,7 +48,6 @@ import org.apache.parquet.io.api.Binary;
 final class ColumnWriterV2 implements ColumnWriter {
   private static final Log LOG = Log.getLog(ColumnWriterV2.class);
   private static final boolean DEBUG = Log.DEBUG;
-  private static final int MIN_SLAB_SIZE = 64;
 
   private final ColumnDescriptor path;
   private final PageWriter pageWriter;
@@ -64,19 +62,14 @@ final class ColumnWriterV2 implements ColumnWriter {
   public ColumnWriterV2(
       ColumnDescriptor path,
       PageWriter pageWriter,
-      ParquetProperties parquetProps,
-      int pageSize) {
+      ParquetProperties props) {
     this.path = path;
     this.pageWriter = pageWriter;
     resetStatistics();
 
-    this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(
-        getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize, parquetProps.getAllocator());
-    this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(
-        getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize, parquetProps.getAllocator());
-
-    int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
-    this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
+    this.repetitionLevelColumn = props.newRepetitionLevelEncoder(path);
+    this.definitionLevelColumn = props.newDefinitionLevelEncoder(path);
+    this.dataColumn = props.newValuesWriter(path);
   }
 
   private void log(Object value, int r, int d) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
index 6792361..d2d78c4 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -25,7 +25,6 @@ import java.util.List;
 
 import org.apache.parquet.Version;
 import org.apache.parquet.VersionParser;
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.junit.Test;
 
 import org.apache.parquet.column.ColumnDescriptor;
@@ -59,7 +58,10 @@ public class TestColumnReaderImpl {
     MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
     ColumnDescriptor col = schema.getColumns().get(0);
     MemPageWriter pageWriter = new MemPageWriter();
-    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
+    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter,
+        ParquetProperties.builder()
+            .withDictionaryPageSize(1024).withWriterVersion(PARQUET_2_0)
+            .withPageSize(2048).build());
     for (int i = 0; i < rows; i++) {
       columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
       if ((i + 1) % 1000 == 0) {
@@ -94,7 +96,10 @@ public class TestColumnReaderImpl {
     MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }");
     ColumnDescriptor col = schema.getColumns().get(0);
     MemPageWriter pageWriter = new MemPageWriter();
-    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter, new ParquetProperties(1024, PARQUET_2_0, true), 2048);
+    ColumnWriterV2 columnWriterV2 = new ColumnWriterV2(col, pageWriter,
+        ParquetProperties.builder()
+            .withDictionaryPageSize(1024).withWriterVersion(PARQUET_2_0)
+            .withPageSize(2048).build());
     for (int i = 0; i < rows; i++) {
       columnWriterV2.writeNull(0, 0);
       if ((i + 1) % 1000 == 0) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
index 9bb2759..1f39d95 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
@@ -191,10 +191,12 @@ public class TestCorruptDeltaByteArrays {
     MemPageStore pages = new MemPageStore(0);
     PageWriter memWriter = pages.getPageWriter(column);
 
-    ParquetProperties parquetProps = new ParquetProperties(0, ParquetProperties.WriterVersion.PARQUET_1_0, false, new HeapByteBufferAllocator());
+    ParquetProperties parquetProps = ParquetProperties.builder()
+        .withDictionaryEncoding(false)
+        .build();
 
     // get generic repetition and definition level bytes to use for pages
-    ValuesWriter rdValues = parquetProps.getColumnDescriptorValuesWriter(0, 10, 100);
+    ValuesWriter rdValues = parquetProps.newDefinitionLevelWriter(column);
     for (int i = 0; i < 10; i += 1) {
       rdValues.writeInteger(0);
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index 044fe2a..42c1776 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -20,14 +20,13 @@ package org.apache.parquet.column.mem;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
 import org.junit.Test;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnReader;
 import org.apache.parquet.column.ColumnWriter;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnReadStoreImpl;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.mem.MemPageStore;
@@ -161,6 +160,10 @@ public class TestMemColumn {
   }
 
   private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
-    return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
+    return new ColumnWriteStoreV1(memPageStore,
+        ParquetProperties.builder()
+            .withPageSize(2048)
+            .withDictionaryEncoding(false)
+            .build());
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
index aff3937..e4687b1 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
@@ -27,8 +27,7 @@ import static org.apache.parquet.example.Paper.schema3;
 import java.util.logging.Level;
 
 import org.apache.parquet.Log;
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.mem.MemPageStore;
 import org.apache.parquet.example.DummyRecordConverter;
@@ -78,7 +77,12 @@ public class PerfTest {
 
 
   private static void write(MemPageStore memPageStore) {
-    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(
+        memPageStore,
+        ParquetProperties.builder()
+            .withPageSize(50*1024*1024)
+            .withDictionaryEncoding(false)
+            .build());
     MessageColumnIO columnIO = newColumnFactory(schema);
 
     GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
index 06f22b6..e9e599a 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
@@ -38,7 +38,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -48,7 +48,6 @@ import org.apache.parquet.Log;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.mem.MemPageStore;
@@ -527,7 +526,12 @@ public class TestColumnIO {
   }
 
   private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
-    return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
+    return new ColumnWriteStoreV1(memPageStore,
+        ParquetProperties.builder()
+            .withPageSize(800)
+            .withDictionaryPageSize(800)
+            .withDictionaryEncoding(useDictionary)
+            .build());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
index 25b629b..ab5a575 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
@@ -21,11 +21,10 @@ package org.apache.parquet.io;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.junit.Test;
 
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.mem.MemPageStore;
 import org.apache.parquet.example.data.Group;
@@ -259,7 +258,12 @@ public class TestFiltered {
 
   private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
     MemPageStore memPageStore = new MemPageStore(number * 2);
-    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(
+        memPageStore,
+        ParquetProperties.builder()
+            .withPageSize(800)
+            .withDictionaryEncoding(false)
+            .build());
 
     RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
     GroupWriter groupWriter = new GroupWriter(recordWriter, schema);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 2b1d48b..74feb39 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -28,11 +28,9 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -54,10 +52,9 @@ class InternalParquetRecordWriter<T> {
   private final long rowGroupSize;
   private long rowGroupSizeThreshold;
   private long nextRowGroupSize;
-  private final int pageSize;
   private final BytesCompressor compressor;
   private final boolean validating;
-  private final ParquetProperties parquetProperties;
+  private final ParquetProperties props;
 
   private long recordCount = 0;
   private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
@@ -67,7 +64,6 @@ class InternalParquetRecordWriter<T> {
   private ColumnChunkPageWriteStore pageStore;
   private RecordConsumer recordConsumer;
 
-
   /**
    * @param parquetFileWriter the file to write to
    * @param writeSupport the class to convert incoming records
@@ -82,13 +78,9 @@ class InternalParquetRecordWriter<T> {
       MessageType schema,
       Map<String, String> extraMetaData,
       long rowGroupSize,
-      int pageSize,
       BytesCompressor compressor,
-      int dictionaryPageSize,
-      boolean enableDictionary,
       boolean validating,
-      WriterVersion writerVersion,
-      ByteBufferAllocator allocator) {
+      ParquetProperties props) {
     this.parquetFileWriter = parquetFileWriter;
     this.writeSupport = checkNotNull(writeSupport, "writeSupport");
     this.schema = schema;
@@ -96,20 +88,15 @@ class InternalParquetRecordWriter<T> {
     this.rowGroupSize = rowGroupSize;
     this.rowGroupSizeThreshold = rowGroupSize;
     this.nextRowGroupSize = rowGroupSizeThreshold;
-    this.pageSize = pageSize;
     this.compressor = compressor;
     this.validating = validating;
-    this.parquetProperties = new ParquetProperties(dictionaryPageSize, writerVersion, enableDictionary, allocator);
+    this.props = props;
     initStore();
   }
 
   private void initStore() {
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, parquetProperties.getAllocator());
-    columnStore = parquetProperties.newColumnWriteStore(
-        schema,
-        pageStore,
-        pageSize,
-        parquetProperties.getAllocator());
+    pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator());
+    columnStore = props.newColumnWriteStore(schema, pageStore);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     this.recordConsumer = columnIO.getRecordWriter(columnStore);
     writeSupport.prepareForWrite(recordConsumer);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 562bffc..8979dba 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -21,7 +21,6 @@ package org.apache.parquet.hadoop;
 import static org.apache.parquet.Log.INFO;
 import static org.apache.parquet.Preconditions.checkNotNull;
 import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
-import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE;
 import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
 
 import java.io.IOException;
@@ -37,6 +36,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 import org.apache.parquet.Log;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -141,6 +141,9 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public static final String MEMORY_POOL_RATIO    = "parquet.memory.pool.ratio";
   public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size";
   public static final String MAX_PADDING_BYTES    = "parquet.writer.max-padding";
+  public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min";
+  public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
+  public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
 
   // default to no padding for now
   private static final int DEFAULT_MAX_PADDING_SIZE = 0;
@@ -238,7 +241,23 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   }
 
   public static boolean getEnableDictionary(Configuration configuration) {
-    return configuration.getBoolean(ENABLE_DICTIONARY, true);
+    return configuration.getBoolean(
+        ENABLE_DICTIONARY, ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED);
+  }
+
+  public static int getMinRowCountForPageSizeCheck(Configuration configuration) {
+    return configuration.getInt(MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK,
+        ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK);
+  }
+
+  public static int getMaxRowCountForPageSizeCheck(Configuration configuration) {
+    return configuration.getInt(MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK,
+        ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK);
+  }
+
+  public static boolean getEstimatePageSizeCheck(Configuration configuration) {
+    return configuration.getBoolean(ESTIMATE_PAGE_SIZE_CHECK,
+        ParquetProperties.DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK);
   }
 
   @Deprecated
@@ -251,15 +270,17 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   }
 
   public static int getPageSize(Configuration configuration) {
-    return configuration.getInt(PAGE_SIZE, DEFAULT_PAGE_SIZE);
+    return configuration.getInt(PAGE_SIZE, ParquetProperties.DEFAULT_PAGE_SIZE);
   }
 
   public static int getDictionaryPageSize(Configuration configuration) {
-    return configuration.getInt(DICTIONARY_PAGE_SIZE, DEFAULT_PAGE_SIZE);
+    return configuration.getInt(
+        DICTIONARY_PAGE_SIZE, ParquetProperties.DEFAULT_DICTIONARY_PAGE_SIZE);
   }
 
   public static WriterVersion getWriterVersion(Configuration configuration) {
-    String writerVersion = configuration.get(WRITER_VERSION, WriterVersion.PARQUET_1_0.toString());
+    String writerVersion = configuration.get(
+        WRITER_VERSION, ParquetProperties.DEFAULT_WRITER_VERSION.toString());
     return WriterVersion.fromString(writerVersion);
   }
 
@@ -341,22 +362,32 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
         throws IOException, InterruptedException {
     final WriteSupport<T> writeSupport = getWriteSupport(conf);
 
+    ParquetProperties props = ParquetProperties.builder()
+        .withPageSize(getPageSize(conf))
+        .withDictionaryPageSize(getDictionaryPageSize(conf))
+        .withDictionaryEncoding(getEnableDictionary(conf))
+        .withWriterVersion(getWriterVersion(conf))
+        .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
+        .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
+        .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
+        .build();
+
     long blockSize = getLongBlockSize(conf);
-    if (INFO) LOG.info("Parquet block size to " + blockSize);
-    int pageSize = getPageSize(conf);
-    if (INFO) LOG.info("Parquet page size to " + pageSize);
-    int dictionaryPageSize = getDictionaryPageSize(conf);
-    if (INFO) LOG.info("Parquet dictionary page size to " + dictionaryPageSize);
-    boolean enableDictionary = getEnableDictionary(conf);
-    if (INFO) LOG.info("Dictionary is " + (enableDictionary ? "on" : "off"));
+    int maxPaddingSize = getMaxPaddingSize(conf);
     boolean validating = getValidation(conf);
+
+    if (INFO) LOG.info("Parquet block size to " + blockSize);
+    if (INFO) LOG.info("Parquet page size to " + props.getPageSizeThreshold());
+    if (INFO) LOG.info("Parquet dictionary page size to " + props.getDictionaryPageSizeThreshold());
+    if (INFO) LOG.info("Dictionary is " + (props.isEnableDictionary() ? "on" : "off"));
     if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
-    WriterVersion writerVersion = getWriterVersion(conf);
-    if (INFO) LOG.info("Writer version is: " + writerVersion);
-    int maxPaddingSize = getMaxPaddingSize(conf);
+    if (INFO) LOG.info("Writer version is: " + props.getWriterVersion());
     if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + " bytes");
+    if (INFO) LOG.info("Page size checking is: " + (props.estimateNextSizeCheck() ? "estimated" : "constant"));
+    if (INFO) LOG.info("Min row count for page size check is: " + props.getMinRowCountForPageSizeCheck());
+    if (INFO) LOG.info("Min row count for page size check is: " + props.getMaxRowCountForPageSizeCheck());
 
-    CodecFactory codecFactory = new CodecFactory(conf, pageSize);
+    CodecFactory codecFactory = new CodecFactory(conf, props.getPageSizeThreshold());
 
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(
@@ -379,12 +410,10 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
         writeSupport,
         init.getSchema(),
         init.getExtraMetaData(),
-        blockSize, pageSize,
+        blockSize,
         codecFactory.getCompressor(codec),
-        dictionaryPageSize,
-        enableDictionary,
         validating,
-        writerVersion,
+        props,
         memoryManager);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
index eefb257..6c94fac 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
@@ -70,9 +70,15 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
       boolean enableDictionary,
       boolean validating,
       WriterVersion writerVersion) {
+    ParquetProperties props = ParquetProperties.builder()
+        .withPageSize(pageSize)
+        .withDictionaryPageSize(dictionaryPageSize)
+        .withDictionaryEncoding(enableDictionary)
+        .withWriterVersion(writerVersion)
+        .build();
     internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
-        extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
-      validating, writerVersion, new HeapByteBufferAllocator());
+        extraMetaData, blockSize, compressor, validating, props);
+    this.memoryManager = null;
   }
 
   /**
@@ -87,6 +93,7 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
    * @param enableDictionary to enable the dictionary
    * @param validating if schema validation should be turned on
    */
+  @Deprecated
   public ParquetRecordWriter(
       ParquetFileWriter w,
       WriteSupport<T> writeSupport,
@@ -99,9 +106,39 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {
       boolean validating,
       WriterVersion writerVersion,
       MemoryManager memoryManager) {
+    this(w, writeSupport, schema, extraMetaData, blockSize, compressor,
+        validating, ParquetProperties.builder()
+            .withPageSize(pageSize)
+            .withDictionaryPageSize(dictionaryPageSize)
+            .withDictionaryEncoding(enableDictionary)
+            .withWriterVersion(writerVersion)
+            .build(),
+        memoryManager);
+  }
+
+  /**
+   *
+   * @param w the file to write to
+   * @param writeSupport the class to convert incoming records
+   * @param schema the schema of the records
+   * @param extraMetaData extra meta data to write in the footer of the file
+   * @param blockSize the size of a block in the file (this will be approximate)
+   * @param compressor the compressor used to compress the pages
+   * @param validating if schema validation should be turned on
+   * @param props parquet encoding properties
+   */
+  ParquetRecordWriter(
+      ParquetFileWriter w,
+      WriteSupport<T> writeSupport,
+      MessageType schema,
+      Map<String, String> extraMetaData,
+      long blockSize,
+      BytesCompressor compressor,
+      boolean validating,
+      ParquetProperties props,
+      MemoryManager memoryManager) {
     internalWriter = new InternalParquetRecordWriter<T>(w, writeSupport, schema,
-      extraMetaData, blockSize, pageSize, compressor, dictionaryPageSize, enableDictionary,
-      validating, writerVersion, new HeapByteBufferAllocator());
+        extraMetaData, blockSize, compressor, validating, props);
     this.memoryManager = checkNotNull(memoryManager, "memoryManager");
     memoryManager.addWriter(internalWriter, blockSize);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index be8c0cd..f58dda4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -29,7 +29,6 @@ import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 /**
  * Write records to a Parquet file.
@@ -37,13 +36,15 @@ import org.apache.parquet.bytes.HeapByteBufferAllocator;
 public class ParquetWriter<T> implements Closeable {
 
   public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
-  public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+  public static final int DEFAULT_PAGE_SIZE =
+      ParquetProperties.DEFAULT_PAGE_SIZE;
   public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
       CompressionCodecName.UNCOMPRESSED;
-  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED =
+      ParquetProperties.DEFAULT_IS_DICTIONARY_ENABLED;
   public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
   public static final WriterVersion DEFAULT_WRITER_VERSION =
-      WriterVersion.PARQUET_1_0;
+      ParquetProperties.DEFAULT_WRITER_VERSION;
 
   public static final String OBJECT_MODEL_NAME_PROP = "writer.model.name";
 
@@ -217,9 +218,14 @@ public class ParquetWriter<T> implements Closeable {
       boolean validating,
       WriterVersion writerVersion,
       Configuration conf) throws IOException {
-    this(file, mode, writeSupport, compressionCodecName, blockSize, pageSize,
-        dictionaryPageSize, enableDictionary, validating, writerVersion, conf,
-        MAX_PADDING_SIZE_DEFAULT);
+    this(file, mode, writeSupport, compressionCodecName, blockSize,
+        validating, conf, MAX_PADDING_SIZE_DEFAULT,
+        ParquetProperties.builder()
+            .withPageSize(pageSize)
+            .withDictionaryPageSize(dictionaryPageSize)
+            .withDictionaryEncoding(enableDictionary)
+            .withWriterVersion(writerVersion)
+            .build());
   }
 
   /**
@@ -255,13 +261,10 @@ public class ParquetWriter<T> implements Closeable {
       WriteSupport<T> writeSupport,
       CompressionCodecName compressionCodecName,
       int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
       boolean validating,
-      WriterVersion writerVersion,
       Configuration conf,
-      int maxPaddingSize) throws IOException {
+      int maxPaddingSize,
+      ParquetProperties encodingProps) throws IOException {
 
     WriteSupport.WriteContext writeContext = writeSupport.init(conf);
     MessageType schema = writeContext.getSchema();
@@ -270,7 +273,7 @@ public class ParquetWriter<T> implements Closeable {
         conf, schema, file, mode, blockSize, maxPaddingSize);
     fileWriter.start();
 
-    CodecFactory codecFactory = new CodecFactory(conf, pageSize);
+    CodecFactory codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
     CodecFactory.BytesCompressor compressor =	codecFactory.getCompressor(compressionCodecName);
     this.writer = new InternalParquetRecordWriter<T>(
         fileWriter,
@@ -278,13 +281,9 @@ public class ParquetWriter<T> implements Closeable {
         schema,
         writeContext.getExtraMetaData(),
         blockSize,
-        pageSize,
         compressor,
-        dictionaryPageSize,
-        enableDictionary,
         validating,
-        writerVersion,
-        new HeapByteBufferAllocator());
+        encodingProps);
   }
 
   public void write(T object) throws IOException {
@@ -326,12 +325,10 @@ public class ParquetWriter<T> implements Closeable {
     private ParquetFileWriter.Mode mode;
     private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
     private int rowGroupSize = DEFAULT_BLOCK_SIZE;
-    private int pageSize = DEFAULT_PAGE_SIZE;
-    private int dictionaryPageSize = DEFAULT_PAGE_SIZE;
     private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
-    private boolean enableDictionary = DEFAULT_IS_DICTIONARY_ENABLED;
     private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
-    private WriterVersion writerVersion = DEFAULT_WRITER_VERSION;
+    private ParquetProperties.Builder encodingPropsBuilder =
+        ParquetProperties.builder();
 
     protected Builder(Path file) {
       this.file = file;
@@ -400,7 +397,7 @@ public class ParquetWriter<T> implements Closeable {
      * @return this builder for method chaining.
      */
     public SELF withPageSize(int pageSize) {
-      this.pageSize = pageSize;
+      encodingPropsBuilder.withPageSize(pageSize);
       return self();
     }
 
@@ -412,7 +409,7 @@ public class ParquetWriter<T> implements Closeable {
      * @return this builder for method chaining.
      */
     public SELF withDictionaryPageSize(int dictionaryPageSize) {
-      this.dictionaryPageSize = dictionaryPageSize;
+      encodingPropsBuilder.withDictionaryPageSize(dictionaryPageSize);
       return self();
     }
 
@@ -435,7 +432,7 @@ public class ParquetWriter<T> implements Closeable {
      * @return this builder for method chaining.
      */
     public SELF enableDictionaryEncoding() {
-      this.enableDictionary = true;
+      encodingPropsBuilder.withDictionaryEncoding(true);
       return self();
     }
 
@@ -446,7 +443,7 @@ public class ParquetWriter<T> implements Closeable {
      * @return this builder for method chaining.
      */
     public SELF withDictionaryEncoding(boolean enableDictionary) {
-      this.enableDictionary = enableDictionary;
+      encodingPropsBuilder.withDictionaryEncoding(enableDictionary);
       return self();
     }
 
@@ -479,7 +476,7 @@ public class ParquetWriter<T> implements Closeable {
      * @return this builder for method chaining.
      */
     public SELF withWriterVersion(WriterVersion version) {
-      this.writerVersion = version;
+      encodingPropsBuilder.withWriterVersion(version);
       return self();
     }
 
@@ -491,8 +488,8 @@ public class ParquetWriter<T> implements Closeable {
      */
     public ParquetWriter<T> build() throws IOException {
       return new ParquetWriter<T>(file, mode, getWriteSupport(conf), codecName,
-          rowGroupSize, pageSize, dictionaryPageSize, enableDictionary,
-          enableValidation, writerVersion, conf, maxPaddingSize);
+          rowGroupSize, enableValidation, conf, maxPaddingSize,
+          encodingPropsBuilder.build());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java
index 7c5a186..e53ac78 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestUtils.java
@@ -53,7 +53,12 @@ public class TestUtils {
       Assert.fail("No exception was thrown (" + message + "), expected: " +
           expected.getName());
     } catch (Exception actual) {
-      Assert.assertEquals(message, expected, actual.getClass());
+      try {
+        Assert.assertEquals(message, expected, actual.getClass());
+      } catch (AssertionError e) {
+        e.addSuppressed(actual);
+        throw e;
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
index c050922..ff192e2 100644
--- a/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
+++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TupleConsumerPerfTest.java
@@ -22,7 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Level;
 
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.NonSpillableDataBag;
@@ -32,7 +32,6 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.parser.ParserException;
 
 import org.apache.parquet.Log;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.mem.MemPageStore;
@@ -60,7 +59,11 @@ public class TupleConsumerPerfTest {
     MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema));
 
     MemPageStore memPageStore = new MemPageStore(0);
-    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(
+        memPageStore, ParquetProperties.builder()
+            .withPageSize(50*1024*1024)
+            .withDictionaryEncoding(false)
+            .build());
     write(memPageStore, columns, schema, pigSchema);
     columns.flush();
     read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/56326400/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
----------------------------------------------------------------------
diff --git a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
index f954e4c..97e0054 100644
--- a/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
+++ b/parquet-thrift/src/test/java/org/apache/parquet/thrift/TestParquetReadProtocol.java
@@ -31,7 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
 import thrift.test.OneOfEach;
 
 import org.apache.thrift.TBase;
@@ -39,7 +39,6 @@ import org.apache.thrift.TException;
 import org.junit.Test;
 
 import org.apache.parquet.Log;
-import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.mem.MemPageStore;
 import org.apache.parquet.io.ColumnIOFactory;
@@ -149,8 +148,11 @@ public class TestParquetReadProtocol {
     final MessageType schema = schemaConverter.convert(thriftClass);
     LOG.info(schema);
     final MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema);
-    final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 10000, 10000, false,
-        WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
+    final ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore,
+        ParquetProperties.builder()
+            .withPageSize(10000)
+            .withDictionaryEncoding(false)
+            .build());
     final RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
     final StructType thriftType = schemaConverter.toStructType(thriftClass);
     ParquetWriteProtocol parquetWriteProtocol = new ParquetWriteProtocol(recordWriter, columnIO, thriftType);


Mime
View raw message