This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch column-indexes
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/column-indexes by this push:
new dc645db PARQUET-1214: Column indexes: Truncate min/max values (#481)
dc645db is described below
commit dc645db1395d5712cb10742a144bf348a72a6778
Author: Gabor Szadovszky <gabor@apache.org>
AuthorDate: Mon Jul 9 15:33:03 2018 +0200
PARQUET-1214: Column indexes: Truncate min/max values (#481)
---
.../apache/parquet/column/ParquetProperties.java | 18 +-
.../columnindex/BinaryColumnIndexBuilder.java | 11 +-
.../column/columnindex/BinaryTruncator.java | 208 +++++++++++++++
.../column/columnindex/ColumnIndexBuilder.java | 14 +-
.../column/columnindex/TestBinaryTruncator.java | 285 +++++++++++++++++++++
.../column/columnindex/TestColumnIndexBuilder.java | 50 ++--
.../parquet/hadoop/ColumnChunkPageWriteStore.java | 10 +-
.../hadoop/InternalParquetRecordWriter.java | 4 +-
.../apache/parquet/hadoop/ParquetFileWriter.java | 24 +-
.../apache/parquet/hadoop/ParquetOutputFormat.java | 17 +-
.../org/apache/parquet/hadoop/ParquetWriter.java | 2 +-
.../converter/TestParquetMetadataConverter.java | 2 +-
.../hadoop/TestColumnChunkPageWriteStore.java | 5 +-
13 files changed, 603 insertions(+), 47 deletions(-)
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index f965511..b173239 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -47,6 +47,7 @@ public class ParquetProperties {
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+ public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
@@ -83,10 +84,11 @@ public class ParquetProperties {
private final boolean estimateNextSizeCheck;
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;
+ private final int columnIndexTruncateLength;
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
- ValuesWriterFactory writerFactory) {
+ ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -99,6 +101,7 @@ public class ParquetProperties {
this.allocator = allocator;
this.valuesWriterFactory = writerFactory;
+ this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
}
public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -183,6 +186,10 @@ public class ParquetProperties {
return valuesWriterFactory;
}
+ public int getColumnIndexTruncateLength() {
+ return columnIndexTruncateLength;
+ }
+
public boolean estimateNextSizeCheck() {
return estimateNextSizeCheck;
}
@@ -205,6 +212,7 @@ public class ParquetProperties {
private boolean estimateNextSizeCheck = DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
+ private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private Builder() {
}
@@ -299,11 +307,17 @@ public class ParquetProperties {
return this;
}
+ public Builder withColumnIndexTruncateLength(int length) {
+ Preconditions.checkArgument(length > 0, "Invalid column index min/max truncate length (negative) : %s", length);
+ this.columnIndexTruncateLength = length;
+ return this;
+ }
+
public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
- estimateNextSizeCheck, allocator, valuesWriterFactory);
+ estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
index 12ed7b4..950b70f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryColumnIndexBuilder.java
@@ -58,6 +58,8 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
private final List<Binary> minValues = new ArrayList<>();
private final List<Binary> maxValues = new ArrayList<>();
+ private final BinaryTruncator truncator;
+ private final int truncateLength;
private static Binary convert(ByteBuffer buffer) {
return Binary.fromReusedByteBuffer(buffer);
@@ -67,6 +69,11 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
return value.toByteBuffer();
}
+ BinaryColumnIndexBuilder(PrimitiveType type, int truncateLength) {
+ truncator = BinaryTruncator.getTruncator(type);
+ this.truncateLength = truncateLength;
+ }
+
@Override
void addMinMaxFromBytes(ByteBuffer min, ByteBuffer max) {
minValues.add(min == null ? null : convert(min));
@@ -75,8 +82,8 @@ class BinaryColumnIndexBuilder extends ColumnIndexBuilder {
@Override
void addMinMax(Object min, Object max) {
- minValues.add((Binary) min);
- maxValues.add((Binary) max);
+ minValues.add(min == null ? null : truncator.truncateMin((Binary) min, truncateLength));
+ maxValues.add(max == null ? null : truncator.truncateMax((Binary) max, truncateLength));
}
@Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
new file mode 100644
index 0000000..bcc43fb
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CoderResult;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * Class for truncating min/max values for binary types.
+ */
+abstract class BinaryTruncator {
+ enum Validity {
+ VALID, MALFORMED, UNMAPPABLE;
+ }
+
+ private static class CharsetValidator {
+ private final CharBuffer dummyBuffer = CharBuffer.allocate(1024);
+ private final CharsetDecoder decoder;
+
+ CharsetValidator(Charset charset) {
+ decoder = charset.newDecoder();
+ decoder.onMalformedInput(CodingErrorAction.REPORT);
+ decoder.onUnmappableCharacter(CodingErrorAction.REPORT);
+ }
+
+ Validity checkValidity(ByteBuffer buffer) {
+ int pos = buffer.position();
+ CoderResult result = CoderResult.OVERFLOW;
+ while (result.isOverflow()) {
+ dummyBuffer.clear();
+ result = decoder.decode(buffer, dummyBuffer, true);
+ }
+ buffer.position(pos);
+ if (result.isUnderflow()) {
+ return Validity.VALID;
+ } else if (result.isMalformed()) {
+ return Validity.MALFORMED;
+ } else {
+ return Validity.UNMAPPABLE;
+ }
+ }
+ }
+
+ private static final BinaryTruncator NO_OP_TRUNCATOR = new BinaryTruncator() {
+ @Override
+ Binary truncateMin(Binary minValue, int length) {
+ return minValue;
+ }
+
+ @Override
+ Binary truncateMax(Binary maxValue, int length) {
+ return maxValue;
+ }
+ };
+
+ private static final BinaryTruncator DEFAULT_UTF8_TRUNCATOR = new BinaryTruncator() {
+ private final CharsetValidator validator = new CharsetValidator(StandardCharsets.UTF_8);
+
+ @Override
+ Binary truncateMin(Binary minValue, int length) {
+ if (minValue.length() <= length) {
+ return minValue;
+ }
+ ByteBuffer buffer = minValue.toByteBuffer();
+ byte[] array;
+ if (validator.checkValidity(buffer) == Validity.VALID) {
+ array = truncateUtf8(buffer, length);
+ } else {
+ array = truncate(buffer, length);
+ }
+ return array == null ? minValue : Binary.fromConstantByteArray(array);
+ }
+
+ @Override
+ Binary truncateMax(Binary maxValue, int length) {
+ if (maxValue.length() <= length) {
+ return maxValue;
+ }
+ byte[] array;
+ ByteBuffer buffer = maxValue.toByteBuffer();
+ if (validator.checkValidity(buffer) == Validity.VALID) {
+ array = incrementUtf8(truncateUtf8(buffer, length));
+ } else {
+ array = increment(truncate(buffer, length));
+ }
+ return array == null ? maxValue : Binary.fromConstantByteArray(array);
+ }
+
+ // Simply truncate to length
+ private byte[] truncate(ByteBuffer buffer, int length) {
+ assert length < buffer.remaining();
+ byte[] array = new byte[length];
+ buffer.get(array);
+ return array;
+ }
+
+ // Trying to increment the bytes from the last one to the beginning
+ private byte[] increment(byte[] array) {
+ for (int i = array.length - 1; i >= 0; --i) {
+ byte elem = array[i];
+ ++elem;
+ array[i] = elem;
+ if (elem != 0) { // Did not overflow: 0xFF -> 0x00
+ return array;
+ }
+ }
+ return null;
+ }
+
+ // Truncates the buffer to length or less so the remaining bytes form a valid UTF-8 string
+ private byte[] truncateUtf8(ByteBuffer buffer, int length) {
+ assert length < buffer.remaining();
+ ByteBuffer newBuffer = buffer.slice();
+ newBuffer.limit(newBuffer.position() + length);
+ while (validator.checkValidity(newBuffer) != Validity.VALID) {
+ newBuffer.limit(newBuffer.limit() - 1);
+ if (newBuffer.remaining() == 0) {
+ return null;
+ }
+ }
+ byte[] array = new byte[newBuffer.remaining()];
+ newBuffer.get(array);
+ return array;
+ }
+
+ // Trying to increment the bytes from the last one to the beginning until the bytes form a valid UTF-8 string
+ private byte[] incrementUtf8(byte[] array) {
+ if (array == null) {
+ return null;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(array);
+ for (int i = array.length - 1; i >= 0; --i) {
+ byte prev = array[i];
+ byte inc = prev;
+ while (++inc != 0) { // Until overflow: 0xFF -> 0x00
+ array[i] = inc;
+ switch (validator.checkValidity(buffer)) {
+ case VALID:
+ return array;
+ case UNMAPPABLE:
+ continue; // Increment the i byte once more
+ case MALFORMED:
+ break; // Stop incrementing the i byte; go to the i-1
+ }
+ break; // MALFORMED
+ }
+ array[i] = prev;
+ }
+ return null; // All characters are the largest possible; unable to increment
+ }
+ };
+
+ static BinaryTruncator getTruncator(PrimitiveType type) {
+ if (type == null) {
+ return NO_OP_TRUNCATOR;
+ }
+ switch (type.getPrimitiveTypeName()) {
+ case INT96:
+ return NO_OP_TRUNCATOR;
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ OriginalType originalType = type.getOriginalType();
+ if (originalType == null) {
+ return DEFAULT_UTF8_TRUNCATOR;
+ }
+ switch (originalType) {
+ case UTF8:
+ case ENUM:
+ case JSON:
+ case BSON:
+ return DEFAULT_UTF8_TRUNCATOR;
+ default:
+ return NO_OP_TRUNCATOR;
+ }
+ default:
+ throw new IllegalArgumentException("No truncator is available for the type: " + type);
+ }
+ }
+
+ abstract Binary truncateMin(Binary minValue, int length);
+
+ abstract Binary truncateMax(Binary maxValue, int length);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
index aa0502b..6d05558 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/ColumnIndexBuilder.java
@@ -220,20 +220,22 @@ public abstract class ColumnIndexBuilder {
/**
* @param type
* the type this builder is to be created for
+ * @param truncateLength
+ * the length to be used for truncating binary values if possible
* @return a {@link ColumnIndexBuilder} instance to be used for creating {@link ColumnIndex} objects
*/
- public static ColumnIndexBuilder getBuilder(PrimitiveType type) {
- ColumnIndexBuilder builder = createNewBuilder(type.getPrimitiveTypeName());
+ public static ColumnIndexBuilder getBuilder(PrimitiveType type, int truncateLength) {
+ ColumnIndexBuilder builder = createNewBuilder(type, truncateLength);
builder.type = type;
return builder;
}
- private static ColumnIndexBuilder createNewBuilder(PrimitiveTypeName type) {
- switch (type) {
+ private static ColumnIndexBuilder createNewBuilder(PrimitiveType type, int truncateLength) {
+ switch (type.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
- return new BinaryColumnIndexBuilder();
+ return new BinaryColumnIndexBuilder(type, truncateLength);
case BOOLEAN:
return new BooleanColumnIndexBuilder();
case DOUBLE:
@@ -276,7 +278,7 @@ public abstract class ColumnIndexBuilder {
PrimitiveTypeName typeName = type.getPrimitiveTypeName();
ColumnIndexBuilder builder = BUILDERS.get(typeName);
if (builder == null) {
- builder = createNewBuilder(typeName);
+ builder = createNewBuilder(type, Integer.MAX_VALUE);
BUILDERS.put(typeName, builder);
}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java
new file mode 100644
index 0000000..c3e3d85
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestBinaryTruncator.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.internal.column.columnindex;
+
+import static org.apache.parquet.schema.OriginalType.BSON;
+import static org.apache.parquet.schema.OriginalType.DECIMAL;
+import static org.apache.parquet.schema.OriginalType.ENUM;
+import static org.apache.parquet.schema.OriginalType.INTERVAL;
+import static org.apache.parquet.schema.OriginalType.JSON;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetDecoder;
+import java.nio.charset.CodingErrorAction;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.Random;
+
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveStringifier;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for {@link BinaryTruncator}
+ */
+public class TestBinaryTruncator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestBinaryTruncator.class);
+ private static final PrimitiveStringifier HEXA_STRINGIFIER = Types.required(BINARY)
+ .named("dummy_type").stringifier();
+ private static final Random RANDOM = new Random(42);
+ private static final CharsetDecoder UTF8_DECODER = StandardCharsets.UTF_8.newDecoder();
+ static {
+ UTF8_DECODER.onMalformedInput(CodingErrorAction.REPORT);
+ UTF8_DECODER.onUnmappableCharacter(CodingErrorAction.REPORT);
+ }
+
+ // The maximum values in UTF-8 for the 1, 2, 3 and 4 bytes representations
+ private static final String UTF8_1BYTE_MAX_CHAR = "\u007F";
+ private static final String UTF8_2BYTES_MAX_CHAR = "\u07FF";
+ private static final String UTF8_3BYTES_MAX_CHAR = "\uFFFF";
+ private static final String UTF8_4BYTES_MAX_CHAR = "\uDBFF\uDFFF";
+
+ @Test
+ public void testNonStringTruncate() {
+ BinaryTruncator truncator = BinaryTruncator
+ .getTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"));
+ assertEquals(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA),
+ truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 2));
+ assertEquals(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06),
+ truncator.truncateMax(binary(0x01, 0x02, 0x03, 0x04, 0x05, 0x06), 2));
+ }
+
+ @Test
+ public void testContractNonStringTypes() {
+ testTruncator(
+ Types.required(FIXED_LEN_BYTE_ARRAY).length(8).as(DECIMAL).precision(18).scale(4).named("test_fixed_decimal"),
+ false);
+ testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("test_fixed_interval"), false);
+ testTruncator(Types.required(BINARY).as(DECIMAL).precision(10).scale(2).named("test_binary_decimal"), false);
+ testTruncator(Types.required(INT96).named("test_int96"), false);
+ }
+
+ @Test
+ public void testStringTruncate() {
+ BinaryTruncator truncator = BinaryTruncator.getTruncator(Types.required(BINARY).as(UTF8).named("test_utf8"));
+
+ // Truncate 1 byte characters
+ assertEquals(Binary.fromString("abc"), truncator.truncateMin(Binary.fromString("abcdef"), 3));
+ assertEquals(Binary.fromString("abd"), truncator.truncateMax(Binary.fromString("abcdef"), 3));
+
+ // Truncate 1-2 bytes characters; the target length is "inside" a UTF-8 character
+ assertEquals(Binary.fromString("árvízt"), truncator.truncateMin(Binary.fromString("árvíztűrő"), 9));
+ assertEquals(Binary.fromString("árvízu"), truncator.truncateMax(Binary.fromString("árvíztűrő"), 9));
+
+ // Truncate highest UTF-8 values -> unable to increment
+ assertEquals(
+ Binary.fromString(
+ UTF8_1BYTE_MAX_CHAR
+ + UTF8_2BYTES_MAX_CHAR),
+ truncator.truncateMin(Binary.fromString(
+ UTF8_1BYTE_MAX_CHAR
+ + UTF8_2BYTES_MAX_CHAR
+ + UTF8_3BYTES_MAX_CHAR
+ + UTF8_4BYTES_MAX_CHAR),
+ 5));
+ assertEquals(
+ Binary.fromString(
+ UTF8_1BYTE_MAX_CHAR
+ + UTF8_2BYTES_MAX_CHAR
+ + UTF8_3BYTES_MAX_CHAR
+ + UTF8_4BYTES_MAX_CHAR),
+ truncator.truncateMax(Binary.fromString(
+ UTF8_1BYTE_MAX_CHAR
+ + UTF8_2BYTES_MAX_CHAR
+ + UTF8_3BYTES_MAX_CHAR
+ + UTF8_4BYTES_MAX_CHAR),
+ 5));
+
+ // Truncate highest UTF-8 values at the end -> increment the first possible character
+ assertEquals(
+ Binary.fromString(
+ UTF8_1BYTE_MAX_CHAR
+ + UTF8_2BYTES_MAX_CHAR
+ + "b"
+ + UTF8_3BYTES_MAX_CHAR),
+ truncator.truncateMax(Binary.fromString(
+ UTF8_1BYTE_MAX_CHAR
+ + UTF8_2BYTES_MAX_CHAR
+ + "a"
+ + UTF8_3BYTES_MAX_CHAR
+ + UTF8_4BYTES_MAX_CHAR),
+ 10));
+
+ // Truncate invalid UTF-8 values -> truncate without validity check
+ assertEquals(binary(0xFF, 0xFE, 0xFD), truncator.truncateMin(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3));
+ assertEquals(binary(0xFF, 0xFE, 0xFE), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA), 3));
+ assertEquals(binary(0xFF, 0xFE, 0xFE, 0x00, 0x00), truncator.truncateMax(binary(0xFF, 0xFE, 0xFD, 0xFF, 0xFF, 0xFF), 5));
+ }
+
+ @Test
+ public void testContractStringTypes() {
+ testTruncator(Types.required(BINARY).named("test_binary"), true);
+ testTruncator(Types.required(BINARY).as(UTF8).named("test_utf8"), true);
+ testTruncator(Types.required(BINARY).as(ENUM).named("test_enum"), true);
+ testTruncator(Types.required(BINARY).as(JSON).named("test_json"), true);
+ testTruncator(Types.required(BINARY).as(BSON).named("test_bson"), true);
+ testTruncator(Types.required(FIXED_LEN_BYTE_ARRAY).length(5).named("test_fixed"), true);
+ }
+
+ private void testTruncator(PrimitiveType type, boolean strict) {
+ BinaryTruncator truncator = BinaryTruncator.getTruncator(type);
+ Comparator<Binary> comparator = type.comparator();
+
+ checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa"), strict, strict);
+ checkContract(truncator, comparator, Binary.fromString("árvíztűrő tükörfúrógép"), strict, strict);
+ checkContract(truncator, comparator, Binary.fromString("aaaaaaaaaa" + UTF8_3BYTES_MAX_CHAR), strict, strict);
+ checkContract(truncator, comparator, Binary.fromString("a" + UTF8_3BYTES_MAX_CHAR + UTF8_1BYTE_MAX_CHAR), strict,
+ strict);
+
+ checkContract(truncator, comparator,
+ Binary.fromConstantByteArray(new byte[] { (byte) 0xFE, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, (byte) 0xFF }), strict,
+ strict);
+
+ // Edge case: zero length -> unable to truncate
+ checkContract(truncator, comparator, Binary.fromString(""), false, false);
+ // Edge case: containing only UTF-8 max characters -> unable to truncate for max
+ checkContract(truncator, comparator, Binary.fromString(
+ UTF8_1BYTE_MAX_CHAR +
+ UTF8_4BYTES_MAX_CHAR +
+ UTF8_3BYTES_MAX_CHAR +
+ UTF8_4BYTES_MAX_CHAR +
+ UTF8_2BYTES_MAX_CHAR +
+ UTF8_3BYTES_MAX_CHAR +
+ UTF8_3BYTES_MAX_CHAR +
+ UTF8_1BYTE_MAX_CHAR +
+ UTF8_2BYTES_MAX_CHAR +
+ UTF8_3BYTES_MAX_CHAR +
+ UTF8_4BYTES_MAX_CHAR),
+ strict, false);
+ // Edge case: non-UTF-8; max bytes -> unable to truncate for max
+ checkContract(
+ truncator, comparator,
+ binary(0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF),
+ strict, false);
+ }
+
+ // Checks the contract of truncator
+ // strict means actual truncation is required and the truncated value is a valid UTF-8 string
+ private void checkContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, boolean strictMin,
+ boolean strictMax) {
+ int length = value.length();
+
+ // Edge cases: returning the original value if no truncation is required
+ assertSame(value, truncator.truncateMin(value, length));
+ assertSame(value, truncator.truncateMax(value, length));
+ assertSame(value, truncator.truncateMin(value, random(length + 1, length * 2 + 1)));
+ assertSame(value, truncator.truncateMax(value, random(length + 1, length * 2 + 1)));
+
+ if (length > 1) {
+ checkMinContract(truncator, comparator, value, length - 1, strictMin);
+ checkMaxContract(truncator, comparator, value, length - 1, strictMax);
+ checkMinContract(truncator, comparator, value, random(1, length - 1), strictMin);
+ checkMaxContract(truncator, comparator, value, random(1, length - 1), strictMax);
+ }
+
+ // Edge case: possible to truncate min value to 0 length if original value is not empty
+ checkMinContract(truncator, comparator, value, 0, strictMin);
+ // Edge case: impossible to truncate max value to 0 length -> returning the original value
+ assertSame(value, truncator.truncateMax(value, 0));
+ }
+
+ private void checkMinContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, int length,
+ boolean strict) {
+ Binary truncated = truncator.truncateMin(value, length);
+ LOG.debug("\"{}\" --truncMin({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(),
+ HEXA_STRINGIFIER.stringify(truncated));
+ assertTrue("truncatedMin(value) should be <= than value", comparator.compare(truncated, value) <= 0);
+ assertFalse("length of truncateMin(value) should not be > than the length of value",
+ truncated.length() > value.length());
+ if (isValidUtf8(value)) {
+ checkValidUtf8(truncated);
+ }
+ if (strict) {
+ assertTrue("length of truncateMin(value) ahould be < than the length of value",
+ truncated.length() < value.length());
+ }
+ }
+
+ private void checkMaxContract(BinaryTruncator truncator, Comparator<Binary> comparator, Binary value, int length,
+ boolean strict) {
+ Binary truncated = truncator.truncateMax(value, length);
+ LOG.debug("\"{}\" --truncMax({})--> \"{}\" [{}]", value.toStringUsingUTF8(), length, truncated.toStringUsingUTF8(),
+ HEXA_STRINGIFIER.stringify(truncated));
+ assertTrue("truncatedMax(value) should be >= than value", comparator.compare(truncated, value) >= 0);
+ assertFalse("length of truncateMax(value) should not be > than the length of value",
+ truncated.length() > value.length());
+ if (isValidUtf8(value)) {
+ checkValidUtf8(truncated);
+ }
+ if (strict) {
+ assertTrue("length of truncateMax(value) ahould be < than the length of value",
+ truncated.length() < value.length());
+ }
+ }
+
+ private static boolean isValidUtf8(Binary binary) {
+ try {
+ UTF8_DECODER.decode(binary.toByteBuffer());
+ return true;
+ } catch (CharacterCodingException e) {
+ return false;
+ }
+ }
+
+ private static void checkValidUtf8(Binary binary) {
+ try {
+ UTF8_DECODER.decode(binary.toByteBuffer());
+ } catch (CharacterCodingException e) {
+ throw new AssertionError("Truncated value should be a valid UTF-8 string", e);
+ }
+ }
+
+ private static int random(int min, int max) {
+ return RANDOM.nextInt(max - min + 1) + min;
+ }
+
+ private static Binary binary(int... unsignedBytes) {
+ byte[] byteArray = new byte[unsignedBytes.length];
+ for (int i = 0, n = byteArray.length; i < n; ++i) {
+ int b = unsignedBytes[i];
+ assert (0xFFFFFF00 & b) == 0;
+ byteArray[i] = (byte) b;
+ }
+ return Binary.fromConstantByteArray(byteArray);
+ }
+
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
index 5acae97..7a5745e 100644
--- a/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/internal/column/columnindex/TestColumnIndexBuilder.java
@@ -65,7 +65,7 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildBinaryDecimal() {
PrimitiveType type = Types.required(BINARY).as(DECIMAL).precision(12).scale(2).named("test_binary_decimal");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
@@ -103,7 +103,7 @@ public class TestColumnIndexBuilder {
null,
decimalBinary("87656273"));
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null, null, null));
builder.add(sb.stats(type, decimalBinary("-9999293.23"), decimalBinary("-234.23")));
@@ -138,7 +138,7 @@ public class TestColumnIndexBuilder {
decimalBinary("1234567890.12"),
null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null, null));
builder.add(sb.stats(type, null, null));
@@ -177,7 +177,7 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildBinaryUtf8() {
PrimitiveType type = Types.required(BINARY).as(UTF8).named("test_binary_utf8");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(BinaryColumnIndexBuilder.class));
assertNull(builder.build());
@@ -215,7 +215,7 @@ public class TestColumnIndexBuilder {
stringBinary("Beeblebrox"),
null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, stringBinary("Beeblebrox"), stringBinary("Dent"), null, null));
builder.add(sb.stats(type, null, null));
@@ -250,7 +250,7 @@ public class TestColumnIndexBuilder {
stringBinary("Slartibartfast"),
null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, stringBinary("Slartibartfast")));
@@ -337,11 +337,11 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildBoolean() {
PrimitiveType type = Types.required(BOOLEAN).named("test_boolean");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(BooleanColumnIndexBuilder.class));
assertNull(builder.build());
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
StatsBuilder sb = new StatsBuilder();
builder.add(sb.stats(type, false, true));
builder.add(sb.stats(type, true, false, null));
@@ -357,7 +357,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), true, true, true, null, false);
assertCorrectValues(columnIndex.getMinValues(), false, false, true, null, false);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, false, false));
@@ -375,7 +375,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), null, false, null, null, true, true, null);
assertCorrectValues(columnIndex.getMinValues(), null, false, null, null, false, false, null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, true, true));
@@ -413,7 +413,7 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildDouble() {
PrimitiveType type = Types.required(DOUBLE).named("test_double");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(DoubleColumnIndexBuilder.class));
assertNull(builder.build());
@@ -433,7 +433,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), -4.1, 7.0, 2.2, null, 2.32, 8.1);
assertCorrectValues(columnIndex.getMinValues(), -4.2, -11.7, 2.2, null, 1.9, -21.0);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, -532.3, -345.2, null, null));
@@ -453,7 +453,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), null, -345.2, -234.6, null, null, 2.99999, null, 42.83, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532.3, -234.7, null, null, -234.6, null, 3.0, null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null, null, null, null));
builder.add(sb.stats(type, 532.3, 345.2));
@@ -493,7 +493,7 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildFloat() {
PrimitiveType type = Types.required(FLOAT).named("test_float");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(FloatColumnIndexBuilder.class));
assertNull(builder.build());
@@ -513,7 +513,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), -4.1f, 7.0f, 2.2f, null, 2.32f, 8.1f);
assertCorrectValues(columnIndex.getMinValues(), -4.2f, -11.7f, 2.2f, null, 1.9f, -21.0f);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, -532.3f, -345.2f, null, null));
@@ -533,7 +533,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), null, -345.2f, -234.7f, null, null, 2.99999f, null, 42.83f, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532.3f, -300.6f, null, null, -234.6f, null, 3.0f, null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null, null, null, null));
builder.add(sb.stats(type, 532.3f, 345.2f));
@@ -573,7 +573,7 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildInt32() {
PrimitiveType type = Types.required(INT32).named("test_int32");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
assertNull(builder.build());
@@ -593,7 +593,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), 10, 7, 2, null, 2, 8);
assertCorrectValues(columnIndex.getMinValues(), -4, -11, 2, null, 1, -21);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, -532, -345, null, null));
@@ -613,7 +613,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), null, -345, -42, null, null, 2, null, 42, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532, -500, null, null, -42, null, 3, null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null, null, null, null));
builder.add(sb.stats(type, 532, 345));
@@ -653,7 +653,7 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildUInt8() {
PrimitiveType type = Types.required(INT32).as(UINT_8).named("test_uint8");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(IntColumnIndexBuilder.class));
assertNull(builder.build());
@@ -673,7 +673,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), 10, 17, 2, null, 0xFF, 0xFA);
assertCorrectValues(columnIndex.getMinValues(), 4, 11, 2, null, 1, 0xEF);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, 0, 0, null, null));
@@ -693,7 +693,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), null, 0, 42, null, null, 0xEE, null, 0xFF, null);
assertCorrectValues(columnIndex.getMinValues(), null, 0, 0, null, null, 42, null, 0xEF, null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null, null, null, null));
builder.add(sb.stats(type, 0xFF, 0xFF));
@@ -717,7 +717,7 @@ public class TestColumnIndexBuilder {
@Test
public void testBuildInt64() {
PrimitiveType type = Types.required(INT64).named("test_int64");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
assertThat(builder, instanceOf(LongColumnIndexBuilder.class));
assertNull(builder.build());
@@ -737,7 +737,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), 10l, 7l, 2l, null, 2l, 8l);
assertCorrectValues(columnIndex.getMinValues(), -4l, -11l, 2l, null, 1l, -21l);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null));
builder.add(sb.stats(type, -532l, -345l, null, null));
@@ -757,7 +757,7 @@ public class TestColumnIndexBuilder {
assertCorrectValues(columnIndex.getMaxValues(), null, -345l, -42l, null, null, 2l, null, 42l, null);
assertCorrectValues(columnIndex.getMinValues(), null, -532l, -234l, null, null, -42l, null, -3l, null);
- builder = ColumnIndexBuilder.getBuilder(type);
+ builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
sb = new StatsBuilder();
builder.add(sb.stats(type, null, null, null, null, null));
builder.add(sb.stats(type, 532l, 345l));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 0646493..29a353f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -76,12 +76,13 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
private ColumnChunkPageWriter(ColumnDescriptor path,
BytesCompressor compressor,
- ByteBufferAllocator allocator) {
+ ByteBufferAllocator allocator,
+ int columnIndexTruncateLength) {
this.path = path;
this.compressor = compressor;
this.allocator = allocator;
this.buf = new ConcatenatingByteArrayCollector();
- this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType());
+ this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
}
@@ -273,10 +274,11 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
private final MessageType schema;
- public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator) {
+ public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
+ int columnIndexTruncateLength) {
this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator));
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength));
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index d9e9b5e..1f8a093 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -20,7 +20,6 @@ package org.apache.parquet.hadoop;
import static java.lang.Math.max;
import static java.lang.Math.min;
-import static java.lang.String.format;
import static org.apache.parquet.Preconditions.checkNotNull;
import java.io.IOException;
@@ -102,7 +101,8 @@ class InternalParquetRecordWriter<T> {
}
private void initStore() {
- pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator());
+ pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
+ props.getColumnIndexTruncateLength());
columnStore = props.newColumnWriteStore(schema, pageStore);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
this.recordConsumer = columnIO.getRecordWriter(columnStore);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3c85b02..3a65624 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -47,6 +47,7 @@ import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
@@ -100,6 +101,7 @@ public class ParquetFileWriter {
private final MessageType schema;
private final PositionOutputStream out;
private final AlignmentStrategy alignment;
+ private final int columnIndexTruncateLength;
// file data
private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
@@ -244,10 +246,27 @@ public class ParquetFileWriter {
* @param rowGroupSize the row group size
* @param maxPaddingSize the maximum padding
* @throws IOException if the file can not be created
+ * @deprecated will be removed in 2.0.0
*/
+ @Deprecated
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
long rowGroupSize, int maxPaddingSize)
throws IOException {
+ this(file, schema, mode, rowGroupSize, maxPaddingSize,
+ ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+ }
+ /**
+ * @param file OutputFile to create or overwrite
+ * @param schema the schema of the data
+ * @param mode file creation mode
+ * @param rowGroupSize the row group size
+ * @param maxPaddingSize the maximum padding
+ * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+ * @throws IOException if the file can not be created
+ */
+ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
+ long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength)
+ throws IOException {
TypeUtil.checkValidWriteSchema(schema);
this.schema = schema;
@@ -267,6 +286,7 @@ public class ParquetFileWriter {
}
this.encodingStatsBuilder = new EncodingStats.Builder();
+ this.columnIndexTruncateLength = columnIndexTruncateLength;
}
/**
@@ -289,6 +309,8 @@ public class ParquetFileWriter {
this.out = HadoopStreams.wrap(
fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
this.encodingStatsBuilder = new EncodingStats.Builder();
+ // no truncation is needed for testing
+ this.columnIndexTruncateLength = Integer.MAX_VALUE;
}
/**
* start the file
@@ -342,7 +364,7 @@ public class ParquetFileWriter {
// The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
currentStatistics = null;
- columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType);
+ columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
firstPageOffset = -1;
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index ff5bab3..0789bf5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -143,6 +143,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
public static final String MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.min";
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
+ public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -312,6 +313,18 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
return conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
}
+ public static void setColumnIndexTruncateLength(JobContext jobContext, int length) {
+ setColumnIndexTruncateLength(getConfiguration(jobContext), length);
+ }
+
+ public static void setColumnIndexTruncateLength(Configuration conf, int length) {
+ conf.setInt(COLUMN_INDEX_TRUNCATE_LENGTH, length);
+ }
+
+ private static int getColumnIndexTruncateLength(Configuration conf) {
+ return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+ }
+
private WriteSupport<T> writeSupport;
private ParquetOutputCommitter committer;
@@ -366,6 +379,7 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
+ .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
.build();
long blockSize = getLongBlockSize(conf);
@@ -383,11 +397,12 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> {
LOG.info("Page size checking is: {}", (props.estimateNextSizeCheck() ? "estimated" : "constant"));
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
+ LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
}
WriteContext init = writeSupport.init(conf);
ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
- init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize);
+ init.getSchema(), Mode.CREATE, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength());
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index a32df39..5b0e4f8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -278,7 +278,7 @@ public class ParquetWriter<T> implements Closeable {
MessageType schema = writeContext.getSchema();
ParquetFileWriter fileWriter = new ParquetFileWriter(
- file, schema, mode, rowGroupSize, maxPaddingSize);
+ file, schema, mode, rowGroupSize, maxPaddingSize, encodingProps.getColumnIndexTruncateLength());
fileWriter.start();
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index b9382fc..90f4a5b 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -918,7 +918,7 @@ public class TestParquetMetadataConverter {
@Test
public void testColumnIndexConversion() {
PrimitiveType type = Types.required(PrimitiveTypeName.INT64).named("test_int64");
- ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type);
+ ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);
Statistics<?> stats = Statistics.createStats(type);
stats.incrementNumNulls(16);
stats.updateStats(-100l);
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index b787268..9a27def 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -160,7 +160,8 @@ public class TestColumnChunkPageWriteStore {
writer.startBlock(rowCount);
pageOffset = outputFile.out().getPos();
{
- ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema , new HeapByteBufferAllocator());
+ ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema,
+ new HeapByteBufferAllocator(), Integer.MAX_VALUE);
PageWriter pageWriter = store.getPageWriter(col);
pageWriter.writePageV2(
rowCount, nullCount, valueCount,
@@ -235,7 +236,7 @@ public class TestColumnChunkPageWriteStore {
// TODO - look back at this, an allocator was being passed here in the ByteBuffer changes
// see comment at this constructor
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
- compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator());
+ compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator(), Integer.MAX_VALUE);
for (ColumnDescriptor col : schema.getColumns()) {
PageWriter pageWriter = store.getPageWriter(col);
|