parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject [parquet-mr] branch column-indexes updated: PARQUET-1214: Column indexes: Truncate min/max values (#481)
Date Mon, 09 Jul 2018 13:33:08 GMT
This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/column-indexes by this push:
     new dc645db  PARQUET-1214: Column indexes: Truncate min/max values (#481)
dc645db is described below

commit dc645db1395d5712cb10742a144bf348a72a6778
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Mon Jul 9 15:33:03 2018 +0200

    PARQUET-1214: Column indexes: Truncate min/max values (#481)
---
 .../apache/parquet/column/ParquetProperties.java   |  18 +-
 .../columnindex/BinaryColumnIndexBuilder.java      |  11 +-
 .../column/columnindex/BinaryTruncator.java        | 208 +++++++++++++++
 .../column/columnindex/ColumnIndexBuilder.java     |  14 +-
 .../column/columnindex/TestBinaryTruncator.java    | 285 +++++++++++++++++++++
 .../column/columnindex/TestColumnIndexBuilder.java |  50 ++--
 .../parquet/hadoop/ColumnChunkPageWriteStore.java  |  10 +-
 .../hadoop/InternalParquetRecordWriter.java        |   4 +-
 .../apache/parquet/hadoop/ParquetFileWriter.java   |  24 +-
 .../apache/parquet/hadoop/ParquetOutputFormat.java |  17 +-
 .../org/apache/parquet/hadoop/ParquetWriter.java   |   2 +-
 .../converter/TestParquetMetadataConverter.java    |   2 +-
 .../hadoop/TestColumnChunkPageWriteStore.java      |   5 +-
 13 files changed, 603 insertions(+), 47 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index f965511..b173239 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -47,6 +47,7 @@ public class ParquetProperties {
   public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+  public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
 
   public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
 
@@ -83,10 +84,11 @@ public class ParquetProperties {
   private final boolean estimateNextSizeCheck;
   private final ByteBufferAllocator allocator;
   private final ValuesWriterFactory valuesWriterFactory;
+  private final int columnIndexTruncateLength;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
-                            ValuesWriterFactory writerFactory) {
+                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -99,6 +101,7 @@ public class ParquetProperties {
     this.allocator = allocator;
 
     this.valuesWriterFactory = writerFactory;
+    this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -183,6 +186,10 @@ public class ParquetProperties {
     return valuesWriterFactory;
   }
 
+  public int getColumnIndexTruncateLength() {
+    return columnIndexTruncateLength;
+  }
+
   public boolean estimateNextSizeCheck() {
     return estimateNextSizeCheck;
   }
@@ -205,6 +212,7 @@ public class ParquetProperties {
     private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
+    private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
 
     private Builder() {
     }
@@ -299,11 +307,17 @@ public class ParquetProperties {
       return this;
     }
 
+    public Builder withColumnIndexTruncateLength(int length) {
+      Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
+      this.columnIndexTruncateLength = length;
+      return this;
+    }
+
     public ParquetProperties build() {
       ParquetProperties properties =
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator, valuesWriterFactory);
+          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
index 12ed7b4..950b70f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -58,6 +58,8 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
 
   private final List<Binary> minValues = new ArrayList<>();
   private final List<Binary> maxValues = new ArrayList<>();
+  private final BinaryTruncator truncator;
+  private final int truncateLength;
 
   private static Binary convert(ByteBuffer buffer) {
     return Binary.fromReusedByteBuffer(buffer);
@@ -67,6 +69,11 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
     return value.toByteBuffer();
   }
 
+  BinaryColumnIndexBuilder(PrimitiveType type, int truncateLength) {
+    truncator = BinaryTruncator.getTruncator(type);
+    this.truncateLength = truncateLength;
+  }
+
   @Override
   void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
     minValues.add(min == null ? null : convert(min));
@@ -75,8 +82,8 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
 
   @Override
   void addMinMax(Object min, Object max) {
-    minValues.add((Binary) min);
-    maxValues.add((Binary) max);
+    minValues.add(min == null ? null : truncator.truncateMin((Binary) min, truncateLength));
+    maxValues.add(max == null ? null : truncator.truncateMax((Binary) max, truncateLength));
   }
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
new file mode 100644
index 0000000..bcc43fb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Class for truncating min/max values for binary types.
+ */
+abstract class BinaryTruncator {
+  enum Validity {
+    VALID, MALFORMED, UNMAPPABLE;
+  }
+
+  private static class CharsetValidator {
+    private final CharBuffer dummyBuffer = CharBuffer.allocate(1024);
+    private final CharsetDecoder decoder;
+
+    CharsetValidator(Charset charset) {
+      decoder = charset.newDecoder();
+      decoder.onMalformedInput(CodingErrorAction.REPORT);
+      decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+    }
+
+    Validity checkValidity(ByteBuffer buffer) {
+      int pos = buffer.position();
+      CoderResult result = CoderResult.OVERFLOW;
+      while (result.isOverflow()) {
+        dummyBuffer.clear();
+        result = decoder.decode(buffer, dummyBuffer, true);
+      }
+      buffer.position(pos);
+      if (result.isUnderflow()) {
+        return Validity.VALID;
+      } else if (result.isMalformed()) {
+        return Validity.MALFORMED;
+      } else {
+        return Validity.UNMAPPABLE;
+      }
+    }
+  }
+
+  private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() {
+    @Override
+    Binary truncateMin(Binary minValue, int length) {
+      return minValue;
+    }
+
+    @Override
+    Binary truncateMax(Binary maxValue, int length) {
+      return maxValue;
+    }
+  };
+
+  private static final BinaryTruncator DEFAULT_UTF8_TRUNCATOR = new BinaryTruncator() {
+    private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8);
+
+    @Override
+    Binary truncateMin(Binary minValue, int length) {
+      if (minValue.length() <= length) {
+        return minValue;
+      }
+      ByteBuffer buffer = minValue.toByteBuffer();
+      byte[] array;
+      if (validator.checkValidity(buffer) == Validity.VALID) {
+        array = truncateUtf8(buffer, length);
+      } else {
+        array = truncate(buffer, length);
+      }
+      return array == null ? minValue : Binary.fromConstantByteArray(array);
+    }
+
+    @Override
+    Binary truncateMax(Binary maxValue, int length) {
+      if (maxValue.length() <= length) {
+        return maxValue;
+      }
+      byte[] array;
+      ByteBuffer buffer = maxValue.toByteBuffer();
+      if (validator.checkValidity(buffer) == Validity.VALID) {
+        array = incrementUtf8(truncateUtf8(buffer, length));
+      } else {
+        array = increment(truncate(buffer, length));
+      }
+      return array == null ? maxValue : Binary.fromConstantByteArray(array);
+    }
+
+    // Simply truncate to length
+    private byte[] truncate(ByteBuffer buffer, int length) {
+      assert length < buffer.remaining();
+      byte[] array = new byte[length];
+      buffer.get(array);
+      return array;
+    }
+
+    // Trying to increment the bytes from the last one to the beginning
+    private byte[] increment(byte[] array) {
+      for (int i = array.length - 1; i >= 0; --i) {
+        byte elem = array[i];
+        ++elem;
+        array[i] = elem;
+        if (elem != 0) { // Did not overflow: 0xFF -> 0x00
+          return array;
+        }
+      }
+      return null;
+    }
+
+    // Truncates the buffer to length or less so the remaining bytes form a valid UTF-8 string
+    private byte[] truncateUtf8(ByteBuffer buffer, int length) {
+      assert length < buffer.remaining();
+      ByteBuffer newBuffer = buffer.slice();
+      newBuffer.limit(newBuffer.position() + length);
+      while (validator.checkValidity(newBuffer) != Validity.VALID) {
+        newBuffer.limit(newBuffer.limit() - 1);
+        if (newBuffer.remaining() == 0) {
+          return null;
+        }
+      }
+      byte[] array = new byte[newBuffer.remaining()];
+      newBuffer.get(array);
+      return array;
+    }
+
+    // Trying to increment the bytes from the last one to the beginning until the bytes form a valid UTF-8 string
+    private byte[] incrementUtf8(byte[] array) {
+      if (array == null) {
+        return null;
+      }
+      ByteBuffer buffer = ByteBuffer.wrap(array);
+      for (int i = array.length - 1; i >= 0; --i) {
+        byte prev = array[i];
+        byte inc = prev;
+        while (++inc != 0) { // Until overflow: 0xFF -> 0x00
+          array[i] = inc;
+          switch (validator.checkValidity(buffer)) {
+            case VALID:
+              return array;
+            case UNMAPPABLE:
+              continue; // Increment the i byte once more
+            case MALFORMED:
+              break; // Stop incrementing the i byte; go to the i-1
+          }
+          break; // MALFORMED
+        }
+        array[i] = prev;
+      }
+      return null; // All characters are the largest possible; unable to increment
+    }
+  };
+
+  static BinaryTruncator getTruncator(PrimitiveType type) {
+    if (type == null) {
+      return NO_OP_TRUNCATOR;
+    }
+    switch (type.getPrimitiveTypeName()) {
+      case INT96:
+        return NO_OP_TRUNCATOR;
+      case BINARY:
+      case FIXED_LEN_BYTE_ARRAY:
+        OriginalType originalType = type.getOriginalType();
+        if (originalType == null) {
+          return DEFAULT_UTF8_TRUNCATOR;
+        }
+        switch (originalType) {
+          case UTF8:
+          case ENUM:
+          case JSON:
+          case BSON:
+            return DEFAULT_UTF8_TRUNCATOR;
+          default:
+            return NO_OP_TRUNCATOR;
+        }
+      default:
+        throw new IllegalArgumentException("No truncator is available for the type: " + type);
+    }
+  }
+
+  abstract Binary truncateMin(Binary minValue, int length);
+
+  abstract Binary truncateMax(Binary maxValue, int length);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index aa0502b..6d05558 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -220,20 +220,22 @@ public abstract class ColumnIndexBuilder {
   /**
    * @param type
    *          the type this builder is to be created for
+   * @param truncateLength
+   *          the length to be used for truncating binary values if possible
    * @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects
    */
-  public static ColumnIndexBuilder getBuilder(PrimitiveType type) {
-    ColumnIndexBuilder builder = createNewBuilder(type.getPrimitiveTypeName());
+  public static ColumnIndexBuilder getBuilder(PrimitiveType type, int truncateLength) {
+    ColumnIndexBuilder builder = createNewBuilder(type, truncateLength);
     builder.type = type;
     return builder;
   }
 
-  private static ColumnIndexBuilder createNewBuilder(PrimitiveTypeName type) {
-    switch (type) {
+  private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int truncateLength) {
+    switch (type.getPrimitiveTypeName()) {
       case BINARY:
       case FIXED_LEN_BYTE_ARRAY:
       case INT96:
-        return new BinaryColumnIndexBuilder();
+        return new BinaryColumnIndexBuilder(type, truncateLength);
       case BOOLEAN:
         return new BooleanColumnIndexBuilder();
       case DOUBLE:
@@ -276,7 +278,7 @@ public abstract class ColumnIndexBuilder {
     PrimitiveTypeName typeName = type.getPrimitiveTypeName();
     ColumnIndexBuilder builder = BUILDERS.get(typeName);
     if (builder == null) {
-      builder = createNewBuilder(typeName);
+      builder = createNewBuilder(type, Integer.MAX_VALUE);
       BUILDERS.put(typeName, builder);
     }
 
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java
new file mode 100644
index 0000000..c3e3d85
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static org.apache.parquet.schema.OriginalType.BSON;
+import static org.apache.parquet.schema.OriginalType.DECIMAL;
+import static org.apache.parquet.schema.OriginalType.ENUM;
+import static org.apache.parquet.schema.OriginalType.INTERVAL;
+import static org.apache.parquet.schema.OriginalType.JSON;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.Random;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for {@link BinaryTruncator}
+ */
+public class TestBinaryTruncator {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestBinaryTruncator.class);
+  private static final PrimitiveStringifier HEXA_STRINGIFIER = Types.required(BINARY)
+      .named("dummy_type").stringifier();
+  private static final Random RANDOM = new Random(42);
+  private static final CharsetDecoder UTF8_DECODER = StandardCharsets.UTF_8.newDecoder();
+  static {
+    UTF8_DECODER.onMalformedInput(CodingErrorAction.REPORT);
+    UTF8_DECODER.onUnmappableCharacter(CodingErrorAction.REPORT);
+  }
+
+  // The maximum values in UTF-8 for the 1, 2, 3 and 4 bytes representations
+  private static final String UTF8_1BYTE_MAX_CHAR = "\u007F";
+  private static final String UTF8_2BYTES_MAX_CHAR = "\u07FF";
+  private static final String UTF8_3BYTES_MAX_CHAR = "\uFFFF";
+  private static final String UTF8_4BYTES_MAX_CHAR = "\uDBFF\uDFFF";
+
+  @Test
+  public void testNonStringTruncate() {
+    BinaryTruncator truncator = BinaryTruncator
+        .getTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"));
+    assertEquals(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA),
+        truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 2));
+    assertEquals(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06),
+        truncator.truncateMax(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06), 2));
+  }
+
+  @Test
+  public void testContractNonStringTypes() {
+    testTruncator(
+        Types.required(FIXED_LEN_BYTE_ARRAY).length(8).as(DECIMAL).precision(18).scale(4).named("test_fixed_decimal"),
+        false);
+    testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("test_fixed_interval"), false);
+    testTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"), false);
+    testTruncator(Types.required(INT96).named("test_int96"), false);
+  }
+
+  @Test
+  public void testStringTruncate() {
+    BinaryTruncator truncator = BinaryTruncator.getTruncator(Types.required(BINARY).as(UTF8).named("test_utf8"));
+
+    // Truncate 1 byte characters
+    assertEquals(Binary.fromString("abc"), truncator.truncateMin(Binary.fromString("abcdef"), 3));
+    assertEquals(Binary.fromString("abd"), truncator.truncateMax(Binary.fromString("abcdef"), 3));
+
+    // Truncate 1-2 bytes characters; the target length is "inside" a UTF-8 character
+    assertEquals(Binary.fromString("árvízt"), truncator.truncateMin(Binary.fromString("árvíztűrő"), 9));
+    assertEquals(Binary.fromString("árvízu"), truncator.truncateMax(Binary.fromString("árvíztűrő"), 9));
+
+    // Truncate highest UTF-8 values -> unable to increment
+    assertEquals(
+        Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR),
+        truncator.truncateMin(Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+            5));
+    assertEquals(
+        Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+        truncator.truncateMax(Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+            5));
+
+    // Truncate highest UTF-8 values at the end -> increment the first possible character
+    assertEquals(
+        Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + "b"
+                + UTF8_3BYTES_MAX_CHAR),
+        truncator.truncateMax(Binary.fromString(
+            UTF8_1BYTE_MAX_CHAR
+                + UTF8_2BYTES_MAX_CHAR
+                + "a"
+                + UTF8_3BYTES_MAX_CHAR
+                + UTF8_4BYTES_MAX_CHAR),
+            10));
+
+    // Truncate invalid UTF-8 values -> truncate without validity check
+    assertEquals(binary(0xFF, 0xFE, 0xFD), truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3));
+    assertEquals(binary(0xFF, 0xFE, 0xFE), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3));
+    assertEquals(binary(0xFF, 0xFE, 0xFE, 0x00, 0x00), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFF, 0xFF, 0xFF), 5));
+  }
+
+  @Test
+  public void testContractStringTypes() {
+    testTruncator(Types.required(BINARY).named("test_binary"), true);
+    testTruncator(Types.required(BINARY).as(UTF8).named("test_utf8"), true);
+    testTruncator(Types.required(BINARY).as(ENUM).named("test_enum"), true);
+    testTruncator(Types.required(BINARY).as(JSON).named("test_json"), true);
+    testTruncator(Types.required(BINARY).as(BSON).named("test_bson"), true);
+    testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(5).named("test_fixed"), true);
+  }
+
+  private void testTruncator(PrimitiveType type, boolean strict) {
+    BinaryTruncator truncator = BinaryTruncator.getTruncator(type);
+    Comparator<Binary> comparator = type.comparator();
+
+    checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa"), strict, strict);
+    checkContract(truncator, comparator, Binary.fromString("árvíztűrő tükörfúrógép"), strict, strict);
+    checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa" + UTF8_3BYTES_MAX_CHAR), strict, strict);
+    checkContract(truncator, comparator, Binary.fromString("a" + UTF8_3BYTES_MAX_CHAR + UTF8_1BYTE_MAX_CHAR), strict,
+        strict);
+
+    checkContract(truncator, comparator,
+        Binary.fromConstantByteArray(new byte[] { (byte) 0xFE, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, (byte) 0xFF }), strict,
+        strict);
+
+    // Edge case: zero length -> unable to truncate
+    checkContract(truncator, comparator, Binary.fromString(""), false, false);
+    // Edge case: containing only UTF-8 max characters -> unable to truncate for max
+    checkContract(truncator, comparator, Binary.fromString(
+        UTF8_1BYTE_MAX_CHAR +
+            UTF8_4BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_4BYTES_MAX_CHAR +
+            UTF8_2BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_1BYTE_MAX_CHAR +
+            UTF8_2BYTES_MAX_CHAR +
+            UTF8_3BYTES_MAX_CHAR +
+            UTF8_4BYTES_MAX_CHAR),
+        strict, false);
+    // Edge case: non-UTF-8; max bytes -> unable to truncate for max
+    checkContract(
+        truncator, comparator,
+        binary(0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF),
+        strict, false);
+  }
+
+  // Checks the contract of truncator
+  // strict means actual truncation is required and the truncated value is a valid UTF-8 string
+  private void checkContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, boolean strictMin,
+      boolean strictMax) {
+    int length = value.length();
+
+    // Edge cases: returning the original value if no truncation is required
+    assertSame(value, truncator.truncateMin(value, length));
+    assertSame(value, truncator.truncateMax(value, length));
+    assertSame(value, truncator.truncateMin(value, random(length + 1, length * 2 + 1)));
+    assertSame(value, truncator.truncateMax(value, random(length + 1, length * 2 + 1)));
+
+    if (length > 1) {
+      checkMinContract(truncator, comparator, value, length - 1, strictMin);
+      checkMaxContract(truncator, comparator, value, length - 1, strictMax);
+      checkMinContract(truncator, comparator, value, random(1, length - 1), strictMin);
+      checkMaxContract(truncator, comparator, value, random(1, length - 1), strictMax);
+    }
+
+    // Edge case: possible to truncate min value to 0 length if original value is not empty
+    checkMinContract(truncator, comparator, value, 0, strictMin);
+    // Edge case: impossible to truncate max value to 0 length -> returning the original value
+    assertSame(value, truncator.truncateMax(value, 0));
+  }
+
+  private void checkMinContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, int length,
+      boolean strict) {
+    Binary truncated = truncator.truncateMin(value, length);
+    LOG.debug("\"{}\" --truncMin({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(),
+        HEXA_STRINGIFIER.stringify(truncated));
+    assertTrue("truncatedMin(value) should be <= than value", comparator.compare(truncated, value) <= 0);
+    assertFalse("length of truncateMin(value) should not be > than the length of value",
+        truncated.length() > value.length());
+    if (isValidUtf8(value)) {
+      checkValidUtf8(truncated);
+    }
+    if (strict) {
+      assertTrue("length of truncateMin(value) ahould be < than the length of value",
+          truncated.length() < value.length());
+    }
+  }
+
+  private void checkMaxContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, int length,
+      boolean strict) {
+    Binary truncated = truncator.truncateMax(value, length);
+    LOG.debug("\"{}\" --truncMax({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(),
+        HEXA_STRINGIFIER.stringify(truncated));
+    assertTrue("truncatedMax(value) should be >= than value", comparator.compare(truncated, value) >= 0);
+    assertFalse("length of truncateMax(value) should not be > than the length of value",
+        truncated.length() > value.length());
+    if (isValidUtf8(value)) {
+      checkValidUtf8(truncated);
+    }
+    if (strict) {
+      assertTrue("length of truncateMax(value) ahould be < than the length of value",
+          truncated.length() < value.length());
+    }
+  }
+
+  private static boolean isValidUtf8(Binary binary) {
+    try {
+      UTF8_DECODER.decode(binary.toByteBuffer());
+      return true;
+    } catch (CharacterCodingException e) {
+      return false;
+    }
+  }
+
+  private static void checkValidUtf8(Binary binary) {
+    try {
+      UTF8_DECODER.decode(binary.toByteBuffer());
+    } catch (CharacterCodingException e) {
+      throw new AssertionError("Truncated value should be a valid UTF-8 string", e);
+    }
+  }
+
+  private static int random(int min, int max) {
+    return RANDOM.nextInt(max - min + 1) + min;
+  }
+
+  private static Binary binary(int... unsignedBytes) {
+    byte[] byteArray = new byte[unsignedBytes.length];
+    for (int i = 0, n = byteArray.length; i < n; ++i) {
+      int b = unsignedBytes[i];
+      assert (0xFFFFFF00 & b) == 0;
+      byteArray[i] = (byte) b;
+    }
+    return Binary.fromConstantByteArray(byteArray);
+  }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 5acae97..7a5745e 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -65,7 +65,7 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildBinaryDecimal() {
     PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -103,7 +103,7 @@ public class TestColumnIndexBuilder {
         null,
         decimalBinary("87656273"));
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null));
     builder.add(sb.stats(type, decimalBinary("-9999293.23"), decimalBinary("-234.23")));
@@ -138,7 +138,7 @@ public class TestColumnIndexBuilder {
         decimalBinary("1234567890.12"),
         null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null));
     builder.add(sb.stats(type, null, null));
@@ -177,7 +177,7 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildBinaryUtf8() {
     PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -215,7 +215,7 @@ public class TestColumnIndexBuilder {
         stringBinary("Beeblebrox"),
         null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, stringBinary("Beeblebrox"), stringBinary("Dent"), null, null));
     builder.add(sb.stats(type, null, null));
@@ -250,7 +250,7 @@ public class TestColumnIndexBuilder {
         stringBinary("Slartibartfast"),
         null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, stringBinary("Slartibartfast")));
@@ -337,11 +337,11 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildBoolean() {
     PrimitiveType type = Types.required(BOOLEAN).named("test_boolean");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class));
     assertNull(builder.build());
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     StatsBuilder sb = new StatsBuilder();
     builder.add(sb.stats(type, false, true));
     builder.add(sb.stats(type, true, false, null));
@@ -357,7 +357,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false);
     assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, false, false));
@@ -375,7 +375,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null);
     assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, true, true));
@@ -413,7 +413,7 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildDouble() {
     PrimitiveType type = Types.required(DOUBLE).named("test_double");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -433,7 +433,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1);
     assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532.3, -345.2, null, null));
@@ -453,7 +453,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532.3, 345.2));
@@ -493,7 +493,7 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildFloat() {
     PrimitiveType type = Types.required(FLOAT).named("test_float");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(FloatColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -513,7 +513,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f);
     assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532.3f, -345.2f, null, null));
@@ -533,7 +533,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532.3f, 345.2f));
@@ -573,7 +573,7 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildInt32() {
     PrimitiveType type = Types.required(INT32).named("test_int32");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -593,7 +593,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8);
     assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532, -345, null, null));
@@ -613,7 +613,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532, 345));
@@ -653,7 +653,7 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildUInt8() {
     PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -673,7 +673,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA);
     assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, 0, 0, null, null));
@@ -693,7 +693,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null);
     assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 0xFF, 0xFF));
@@ -717,7 +717,7 @@ public class TestColumnIndexBuilder {
   @Test
   public void testBuildInt64() {
     PrimitiveType type = Types.required(INT64).named("test_int64");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     assertThat(builder, instanceOf(LongColumnIndexBuilder.class));
     assertNull(builder.build());
 
@@ -737,7 +737,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l);
     assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null));
     builder.add(sb.stats(type, -532l, -345l, null, null));
@@ -757,7 +757,7 @@ public class TestColumnIndexBuilder {
     assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null);
     assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null);
 
-    builder = ColumnIndexBuilder.getBuilder(type);
+    builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     sb = new StatsBuilder();
     builder.add(sb.stats(type, null, null, null, null, null));
     builder.add(sb.stats(type, 532l, 345l));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 0646493..29a353f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -76,12 +76,13 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
 
     private ColumnChunkPageWriter(ColumnDescriptor path,
                                   BytesCompressor compressor,
-                                  ByteBufferAllocator allocator) {
+                                  ByteBufferAllocator allocator,
+                                  int columnIndexTruncateLength) {
       this.path = path;
       this.compressor = compressor;
       this.allocator = allocator;
       this.buf = new ConcatenatingByteArrayCollector();
-      this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType());
+      this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
       this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
     }
 
@@ -273,10 +274,11 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
   private final MessageType schema;
 
-  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator) {
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
+      int columnIndexTruncateLength) {
     this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path,  new ColumnChunkPageWriter(path, compressor, allocator));
+      writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength));
     }
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index d9e9b5e..1f8a093 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -20,7 +20,6 @@ package org.apache.parquet.hadoop;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
-import static java.lang.String.format;
 import static org.apache.parquet.Preconditions.checkNotNull;
 
 import java.io.IOException;
@@ -102,7 +101,8 @@ class InternalParquetRecordWriter<T> {
   }
 
   private void initStore() {
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator());
+    pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
+        props.getColumnIndexTruncateLength());
     columnStore = props.newColumnWriteStore(schema, pageStore);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     this.recordConsumer = columnIO.getRecordWriter(columnStore);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3c85b02..3a65624 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -47,6 +47,7 @@ import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
@@ -100,6 +101,7 @@ public class ParquetFileWriter {
   private final MessageType schema;
   private final PositionOutputStream out;
   private final AlignmentStrategy alignment;
+  private final int columnIndexTruncateLength;
 
   // file data
   private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
@@ -244,10 +246,27 @@ public class ParquetFileWriter {
    * @param rowGroupSize the row group size
    * @param maxPaddingSize the maximum padding
    * @throws IOException if the file can not be created
+   * @deprecated will be removed in 2.0.0
    */
+  @Deprecated
   public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
                            long rowGroupSize, int maxPaddingSize)
       throws IOException {
+    this(file, schema, mode, rowGroupSize, maxPaddingSize,
+        ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+  }
+  /**
+   * @param file OutputFile to create or overwrite
+   * @param schema the schema of the data
+   * @param mode file creation mode
+   * @param rowGroupSize the row group size
+   * @param maxPaddingSize the maximum padding
+   * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+   * @throws IOException if the file can not be created
+   */
+  public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+                           long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength)
+      throws IOException {
     TypeUtil.checkValidWriteSchema(schema);
 
     this.schema = schema;
@@ -267,6 +286,7 @@ public class ParquetFileWriter {
     }
 
     this.encodingStatsBuilder = new EncodingStats.Builder();
+    this.columnIndexTruncateLength = columnIndexTruncateLength;
   }
 
   /**
@@ -289,6 +309,8 @@ public class ParquetFileWriter {
     this.out = HadoopStreams.wrap(
         fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
     this.encodingStatsBuilder = new EncodingStats.Builder();
+    // no truncation is needed for testing
+    this.columnIndexTruncateLength = Integer.MAX_VALUE;
   }
   /**
    * start the file
@@ -342,7 +364,7 @@ public class ParquetFileWriter {
     // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
     currentStatistics = null;
 
-    columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType);
+    columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
     offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
     firstPageOffset = -1;
   }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index ff5bab3..0789bf5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -143,6 +143,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
   public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min";
   public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
+  public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
 
   public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
     String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -312,6 +313,18 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
     return conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
   }
 
+  public static void setColumnIndexTruncateLength(JobContext jobContext, int length) {
+    setColumnIndexTruncateLength(getConfiguration(jobContext), length);
+  }
+
+  public static void setColumnIndexTruncateLength(Configuration conf, int length) {
+    conf.setInt(COLUMN_INDEX_TRUNCATE_LENGTH, length);
+  }
+
+  private static int getColumnIndexTruncateLength(Configuration conf) {
+    return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+  }
+
   private WriteSupport<T> writeSupport;
   private ParquetOutputCommitter committer;
 
@@ -366,6 +379,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
         .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
         .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
+        .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
         .build();
 
     long blockSize = getLongBlockSize(conf);
@@ -383,11 +397,12 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
       LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant"));
       LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
+      LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
     }
 
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
-        init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize);
+        init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength());
     w.start();
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index a32df39..5b0e4f8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -278,7 +278,7 @@ public class ParquetWriter<T> implements Closeable {
     MessageType schema = writeContext.getSchema();
 
     ParquetFileWriter fileWriter = new ParquetFileWriter(
-        file, schema, mode, rowGroupSize, maxPaddingSize);
+        file, schema, mode, rowGroupSize, maxPaddingSize, encodingProps.getColumnIndexTruncateLength());
     fileWriter.start();
 
     this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index b9382fc..90f4a5b 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -918,7 +918,7 @@ public class TestParquetMetadataConverter {
   @Test
   public void testColumnIndexConversion() {
     PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64");
-    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+    ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
     Statistics<?> stats = Statistics.createStats(type);
     stats.incrementNumNulls(16);
     stats.updateStats(-100l);
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index b787268..9a27def 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -160,7 +160,8 @@ public class TestColumnChunkPageWriteStore {
       writer.startBlock(rowCount);
       pageOffset = outputFile.out().getPos();
       {
-        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator());
+        ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema,
+            new HeapByteBufferAllocator(), Integer.MAX_VALUE);
         PageWriter pageWriter = store.getPageWriter(col);
         pageWriter.writePageV2(
             rowCount, nullCount, valueCount,
@@ -235,7 +236,7 @@ public class TestColumnChunkPageWriteStore {
     // TODO - look back at this, an allocator was being passed here in the ByteBuffer changes
     // see comment at this constructor
     ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
-        compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator());
+        compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator(), Integer.MAX_VALUE);
 
     for (ColumnDescriptor col : schema.getColumns()) {
       PageWriter pageWriter = store.getPageWriter(col);


Mime
View raw message