Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E417217F62 for ; Thu, 19 Nov 2015 18:46:15 +0000 (UTC) Received: (qmail 82448 invoked by uid 500); 19 Nov 2015 18:46:15 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 82423 invoked by uid 500); 19 Nov 2015 18:46:15 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 82414 invoked by uid 99); 19 Nov 2015 18:46:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Nov 2015 18:46:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A4FFBE17DA; Thu, 19 Nov 2015 18:46:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Message-Id: <587ab8de98cc4dbbbbb18470bdcbad4d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-mr git commit: PARQUET-378: Add thoroughly parquet test encodings Date: Thu, 19 Nov 2015 18:46:15 +0000 (UTC) Repository: parquet-mr Updated Branches: refs/heads/master 09129877d -> efafa6199 PARQUET-378: Add thoroughly parquet test encodings A new test case TestTypeEncodings is added that test v1 and v2 encodings for all supported column types. This test case spans many pages and row groups, and reads each page individually from first-to-last and from last-to-first. Author: Sergio Pena Closes #274 from spena/parquet-378 and squashes the following commits: b35c339 [Sergio Pena] PARQUET-378: Add thoroughly parquet test encodings Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/efafa619 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/efafa619 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/efafa619 Branch: refs/heads/master Commit: efafa61992658eab64c893e9eef49f545d75673c Parents: 0912987 Author: Sergio Pena Authored: Thu Nov 19 10:46:07 2015 -0800 Committer: Ryan Blue Committed: Thu Nov 19 10:46:07 2015 -0800 ---------------------------------------------------------------------- .../parquet/encodings/FileEncodingsIT.java | 490 +++++++++++++++++++ .../apache/parquet/statistics/RandomValues.java | 18 +- pom.xml | 13 + 3 files changed, 512 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/efafa619/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java new file mode 100644 index 0000000..72d281f --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java @@ -0,0 +1,490 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.encodings; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.column.impl.ColumnReaderImpl; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.*; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.statistics.RandomValues; +import org.apache.parquet.statistics.TestStatistics; +import org.junit.*; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +import static junit.framework.Assert.assertEquals; + +/** + * This class contains test cases to validate each data type encoding. + * Each test runs against all Parquet writer versions. + * All data types are validating with and without dictionary encoding. + */ +@RunWith(Parameterized.class) +public class FileEncodingsIT { + private static final int RANDOM_SEED = 1; + private static final int RECORD_COUNT = 2000000; + private static final int FIXED_LENGTH = 60; + private static final int TEST_PAGE_SIZE = 16 * 1024; // 16K + private static final int TEST_ROW_GROUP_SIZE = 128 * 1024; // 128K + private static final int TEST_DICT_PAGE_SIZE = TEST_PAGE_SIZE; + + private static final Configuration configuration = new Configuration(); + + private static RandomValues.IntGenerator intGenerator; + private static RandomValues.LongGenerator longGenerator; + private static RandomValues.Int96Generator int96Generator; + private static RandomValues.FloatGenerator floatGenerator; + private static RandomValues.DoubleGenerator doubleGenerator; + private static RandomValues.BinaryGenerator binaryGenerator; + private static RandomValues.FixedGenerator fixedBinaryGenerator; + + // Parameters + private PrimitiveTypeName paramTypeName; + + @Parameterized.Parameters + public static Collection getParameters() { + return Arrays.asList(new Object[][] { + { PrimitiveTypeName.BOOLEAN }, + { PrimitiveTypeName.INT32 }, + { PrimitiveTypeName.INT64 }, + { PrimitiveTypeName.INT96 }, + { PrimitiveTypeName.FLOAT }, + { PrimitiveTypeName.DOUBLE }, + { PrimitiveTypeName.BINARY }, + { PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY } + }); + } + + public FileEncodingsIT(PrimitiveTypeName typeName) { + this.paramTypeName = typeName; + } + + @BeforeClass + public static void initialize() throws IOException { + Random random = new Random(RANDOM_SEED); + intGenerator = new RandomValues.IntGenerator(random.nextLong()); + longGenerator = new RandomValues.LongGenerator(random.nextLong()); + int96Generator = new RandomValues.Int96Generator(random.nextLong()); + floatGenerator = new RandomValues.FloatGenerator(random.nextLong()); + doubleGenerator = new RandomValues.DoubleGenerator(random.nextLong()); + binaryGenerator = new RandomValues.BinaryGenerator(random.nextLong()); + fixedBinaryGenerator = new RandomValues.FixedGenerator(random.nextLong(), FIXED_LENGTH); + } + + @Test + public void testFileEncodingsWithoutDictionary() throws Exception { + final boolean DISABLE_DICTIONARY = false; + List randomValues; randomValues = generateRandomValues(this.paramTypeName, RECORD_COUNT); + + /* Run an encoding test per each writer version. + * This loop will make sure to test future writer versions added to WriterVersion enum. + */ + for (WriterVersion writerVersion : WriterVersion.values()) { + System.out.println(String.format("Testing %s/%s encodings using ROW_GROUP_SIZE=%d PAGE_SIZE=%d", + writerVersion.toString(), this.paramTypeName.toString(), TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE)); + + Path parquetFile = createTempFile(); + writeValuesToFile(parquetFile, this.paramTypeName, randomValues, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE, DISABLE_DICTIONARY, writerVersion); + PageGroupValidator.validatePages(parquetFile, randomValues); + } + } + + @Test + public void testFileEncodingsWithDictionary() throws Exception { + final boolean ENABLE_DICTIONARY = true; + List dictionaryValues = generateDictionaryValues(this.paramTypeName, RECORD_COUNT); + + /* Run an encoding test per each writer version. + * This loop will make sure to test future writer versions added to WriterVersion enum. + */ + for (WriterVersion writerVersion : WriterVersion.values()) { + System.out.println(String.format("Testing %s/%s + DICTIONARY encodings using ROW_GROUP_SIZE=%d PAGE_SIZE=%d", + writerVersion.toString(), this.paramTypeName.toString(), TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE)); + + Path parquetFile = createTempFile(); + writeValuesToFile(parquetFile, this.paramTypeName, dictionaryValues, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE, ENABLE_DICTIONARY, writerVersion); + PageGroupValidator.validatePages(parquetFile, dictionaryValues); + } + } + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private Path createTempFile() throws IOException { + File tempFile = tempFolder.newFile(); + tempFile.delete(); + return new Path(tempFile.getAbsolutePath()); + } + + /** + * Writes a set of values to a parquet file. + * The ParquetWriter will write the values with dictionary encoding disabled so that we test specific encodings for + */ + private void writeValuesToFile(Path file, PrimitiveTypeName type, List values, int rowGroupSize, int pageSize, boolean enableDictionary, WriterVersion version) throws IOException { + MessageType schema; + if (type == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + schema = Types.buildMessage().required(type).length(FIXED_LENGTH).named("field").named("test"); + } else { + schema = Types.buildMessage().required(type).named("field").named("test"); + } + + SimpleGroupFactory message = new SimpleGroupFactory(schema); + GroupWriteSupport.setSchema(schema, configuration); + + ParquetWriter writer = new ParquetWriter(file, new GroupWriteSupport(), + CompressionCodecName.UNCOMPRESSED, rowGroupSize, pageSize, TEST_DICT_PAGE_SIZE, enableDictionary, false, version, configuration); + + for (Object o: values) { + switch (type) { + case BOOLEAN: + writer.write(message.newGroup().append("field", (Boolean)o)); + break; + case INT32: + writer.write(message.newGroup().append("field", (Integer)o)); + break; + case INT64: + writer.write(message.newGroup().append("field", (Long)o)); + break; + case FLOAT: + writer.write(message.newGroup().append("field", (Float)o)); + break; + case DOUBLE: + writer.write(message.newGroup().append("field", (Double)o)); + break; + case INT96: + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + writer.write(message.newGroup().append("field", (Binary)o)); + break; + default: + throw new IllegalArgumentException("Unknown type name: " + type); + } + } + + writer.close(); + } + + private List generateRandomValues(PrimitiveTypeName type, int count) { + List values = new ArrayList(); + + for (int i=0; i generateDictionaryValues(PrimitiveTypeName type, int count) { + final int DICT_VALUES_SIZE = 100; + + final List DICT_BINARY_VALUES = generateRandomValues(PrimitiveTypeName.BINARY, DICT_VALUES_SIZE); + final List DICT_INT96_VALUES = generateRandomValues(PrimitiveTypeName.INT96, DICT_VALUES_SIZE); + final List DICT_FIXED_LEN_VALUES = generateRandomValues(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, DICT_VALUES_SIZE); + + List values = new ArrayList(); + + for (int i=0; i expectedValues) throws IOException { + List blockReaders = readBlocksFromFile(file); + MessageType fileSchema = readSchemaFromFile(file); + int rowGroupID = 0; + int rowsRead = 0; + for (PageReadStore pageReadStore : blockReaders) { + for (ColumnDescriptor columnsDesc : fileSchema.getColumns()) { + List pageGroup = getPageGroupForColumn(pageReadStore, columnsDesc); + DictionaryPage dictPage = getDictionaryPageForColumn(pageReadStore, columnsDesc); + + List expectedRowGroupValues = expectedValues.subList(rowsRead, (int)(rowsRead + pageReadStore.getRowCount())); + validateFirstToLast(rowGroupID, dictPage, pageGroup, columnsDesc, expectedRowGroupValues); + validateLastToFirst(rowGroupID, dictPage, pageGroup, columnsDesc, expectedRowGroupValues); + } + + rowsRead += pageReadStore.getRowCount(); + rowGroupID++; + } + } + + private static void validateFirstToLast(int rowGroupID, DictionaryPage dictPage, List pageGroup, ColumnDescriptor desc, List expectedValues) { + int rowsRead = 0, pageID = 0; + for (DataPage page : pageGroup) { + List expectedPageValues = expectedValues.subList(rowsRead, rowsRead + page.getValueCount()); + PageValuesValidator.validateValuesForPage(rowGroupID, pageID, dictPage, page, desc, expectedPageValues); + rowsRead += page.getValueCount(); + pageID++; + } + } + + private static void validateLastToFirst(int rowGroupID, DictionaryPage dictPage, List pageGroup, ColumnDescriptor desc, List expectedValues) { + int rowsLeft = expectedValues.size(); + for (int pageID = pageGroup.size() - 1; pageID >= 0; pageID--) { + DataPage page = pageGroup.get(pageID); + int offset = rowsLeft - page.getValueCount(); + List expectedPageValues = expectedValues.subList(offset, offset + page.getValueCount()); + PageValuesValidator.validateValuesForPage(rowGroupID, pageID, dictPage, page, desc, expectedPageValues); + rowsLeft -= page.getValueCount(); + } + } + + private static DictionaryPage getDictionaryPageForColumn(PageReadStore pageReadStore, ColumnDescriptor columnDescriptor) { + PageReader pageReader = pageReadStore.getPageReader(columnDescriptor); + return pageReader.readDictionaryPage(); + } + + private static List getPageGroupForColumn(PageReadStore pageReadStore, ColumnDescriptor columnDescriptor) { + PageReader pageReader = pageReadStore.getPageReader(columnDescriptor); + List pageGroup = new ArrayList(); + + DataPage page; + while ((page = pageReader.readPage()) != null) { + pageGroup.add(page); + } + + return pageGroup; + } + + private static MessageType readSchemaFromFile(Path file) throws IOException { + ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, file, ParquetMetadataConverter.NO_FILTER); + return metadata.getFileMetaData().getSchema(); + } + + + private static List readBlocksFromFile(Path file) throws IOException { + List rowGroups = new ArrayList(); + + ParquetMetadata metadata = ParquetFileReader.readFooter(configuration, file, ParquetMetadataConverter.NO_FILTER); + ParquetFileReader fileReader = new ParquetFileReader(configuration, metadata.getFileMetaData(), file, metadata.getBlocks(), + metadata.getFileMetaData().getSchema().getColumns()); + + PageReadStore group; + while ((group = fileReader.readNextRowGroup()) != null) { + rowGroups.add(group); + } + + return rowGroups; + } + } + + /** + * This class is used to validate all values read from a page against a list + * of expected values. + */ + private static class PageValuesValidator { + private List expectedValues; + private int currentPos; + private int pageID; + private int rowGroupID; + + public PageValuesValidator(int rowGroupID, int pageID, List expectedValues) { + this.rowGroupID = rowGroupID; + this.pageID = pageID; + this.expectedValues = expectedValues; + } + + public void validateNextValue(Object value) { + assertEquals(String.format("Value from page is different than expected, ROW_GROUP_ID=%d PAGE_ID=%d VALUE_POS=%d", + rowGroupID, pageID, currentPos), expectedValues.get(currentPos++), value); + } + + public static void validateValuesForPage(int rowGroupID, int pageID, DictionaryPage dictPage, DataPage page, ColumnDescriptor columnDesc, List expectedValues) { + TestStatistics.SingletonPageReader pageReader = new TestStatistics.SingletonPageReader(dictPage, page); + PrimitiveConverter converter = getConverter(rowGroupID, pageID, columnDesc.getType(), expectedValues); + ColumnReaderImpl column = new ColumnReaderImpl(columnDesc, pageReader, converter, null); + for (int i = 0; i < pageReader.getTotalValueCount(); i += 1) { + column.writeCurrentValueToConverter(); + column.consume(); + } + } + + private static PrimitiveConverter getConverter(final int rowGroupID, final int pageID, PrimitiveTypeName type, final List expectedValues) { + return type.convert(new PrimitiveType.PrimitiveTypeNameConverter() { + + @Override + public PrimitiveConverter convertFLOAT(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + final PageValuesValidator validator = new PageValuesValidator(rowGroupID, pageID, expectedValues); + return new PrimitiveConverter() { + @Override + public void addFloat(float value) { + validator.validateNextValue(value); + } + }; + } + + @Override + public PrimitiveConverter convertDOUBLE(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + final PageValuesValidator validator = new PageValuesValidator(rowGroupID, pageID, expectedValues); + return new PrimitiveConverter() { + @Override + public void addDouble(double value) { + validator.validateNextValue(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT32(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + final PageValuesValidator validator = new PageValuesValidator(rowGroupID, pageID, expectedValues); + return new PrimitiveConverter() { + @Override + public void addInt(int value) { + validator.validateNextValue(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT64(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + final PageValuesValidator validator = new PageValuesValidator(rowGroupID, pageID, expectedValues); + return new PrimitiveConverter() { + @Override + public void addLong(long value) { + validator.validateNextValue(value); + } + }; + } + + @Override + public PrimitiveConverter convertINT96(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + return convertBINARY(primitiveTypeName); + } + + @Override + public PrimitiveConverter convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + return convertBINARY(primitiveTypeName); + } + + @Override + public PrimitiveConverter convertBOOLEAN(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + final PageValuesValidator validator = new PageValuesValidator(rowGroupID, pageID, expectedValues); + return new PrimitiveConverter() { + @Override + public void addBoolean(boolean value) { + validator.validateNextValue(value); + } + }; + } + + @Override + public PrimitiveConverter convertBINARY(PrimitiveTypeName primitiveTypeName) throws RuntimeException { + final PageValuesValidator validator = new PageValuesValidator(rowGroupID, pageID, expectedValues); + return new PrimitiveConverter() { + @Override + public void addBinary(Binary value) { + validator.validateNextValue(value); + } + }; + } + }); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/efafa619/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java index fa73099..cbdd935 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java @@ -79,7 +79,7 @@ public class RandomValues { return builder.toString(); } - protected abstract T nextValue(); + public abstract T nextValue(); } private static abstract class RandomBinaryBase> extends RandomValueGenerator { @@ -113,7 +113,7 @@ public class RandomValues { } @Override - protected Integer nextValue() { + public Integer nextValue() { return (minimum + randomInt(range)); } } @@ -129,7 +129,7 @@ public class RandomValues { } @Override - protected Long nextValue() { + public Long nextValue() { return (minimum + randomLong(range)); } } @@ -147,7 +147,7 @@ public class RandomValues { } @Override - protected BigInteger nextValue() { + public BigInteger nextValue() { return (minimum.add(randomInt96(range))); } @@ -168,7 +168,7 @@ public class RandomValues { } @Override - protected Float nextValue() { + public Float nextValue() { return (minimum + randomFloat(range)); } } @@ -184,7 +184,7 @@ public class RandomValues { } @Override - protected Double nextValue() { + public Double nextValue() { return (minimum + randomDouble(range)); } } @@ -196,7 +196,7 @@ public class RandomValues { } @Override - protected String nextValue() { + public String nextValue() { int stringLength = randomInt(15) + 1; return randomString(stringLength); } @@ -214,7 +214,7 @@ public class RandomValues { } @Override - protected Binary nextValue() { + public Binary nextValue() { // use a random length, but ensure it is at least a few bytes int length = 5 + randomInt(buffer.length - 5); for (int index = 0; index < length; index++) { @@ -236,7 +236,7 @@ public class RandomValues { } @Override - protected Binary nextValue() { + public Binary nextValue() { for (int index = 0; index < buffer.length; index++) { buffer[index] = (byte) randomInt(); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/efafa619/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c9d7b94..c769ad3 100644 --- a/pom.xml +++ b/pom.xml @@ -335,6 +335,19 @@ org.apache.maven.plugins + maven-failsafe-plugin + 2.10 + + + + integration-test + verify + + + + + + org.apache.maven.plugins maven-surefire-plugin 2.10