parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [38/50] [abbrv] parquet-mr git commit: PARQUET-612: Add compression codec to FileEncodingsIT.
Date Thu, 19 Jan 2017 01:27:49 GMT
PARQUET-612: Add compression codec to FileEncodingsIT.

Author: Ryan Blue <blue@apache.org>

Closes #343 from rdblue/PARQUET-612-test-compression and squashes the following commits:

a5b7dbb [Ryan Blue] PARQUET-612: Add compression codec to FileEncodingsIT.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/d59381a5
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/d59381a5
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/d59381a5

Branch: refs/heads/parquet-1.8.x
Commit: d59381a5547675cbfdd2775c247f3c18abe43ea2
Parents: 1535970
Author: Ryan Blue <blue@apache.org>
Authored: Thu Jun 30 09:54:08 2016 -0700
Committer: Ryan Blue <blue@apache.org>
Committed: Mon Jan 9 16:54:54 2017 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 .../parquet/encodings/FileEncodingsIT.java      | 112 +++++++++++++++----
 2 files changed, 95 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d59381a5/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index aa95349..890a372 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -24,8 +24,8 @@ before_install:
   - cd ..
 
 env:
-  - HADOOP_PROFILE=default
-  - HADOOP_PROFILE=hadoop-2
+  - HADOOP_PROFILE=default TEST_CODECS=uncompressed
+  - HADOOP_PROFILE=hadoop-2 TEST_CODECS=gzip,snappy
 
 install: mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true
> mvn_install.log || mvn install --batch-mode -DskipTests=true -Dmaven.javadoc.skip=true
-Dsource.skip=true > mvn_install.log || (cat mvn_install.log && false)
 script: mvn test -P $HADOOP_PROFILE

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d59381a5/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java
b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java
index 72d281f..4af9866 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java
@@ -20,10 +20,13 @@ package org.apache.parquet.encodings;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnReaderImpl;
 import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
@@ -32,9 +35,11 @@ import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.*;
@@ -78,23 +83,42 @@ public class FileEncodingsIT {
 
   // Parameters
   private PrimitiveTypeName paramTypeName;
+  private CompressionCodecName compression;
 
   @Parameterized.Parameters
   public static Collection<Object[]> getParameters() {
-    return Arrays.asList(new Object[][] {
-        { PrimitiveTypeName.BOOLEAN },
-        { PrimitiveTypeName.INT32 },
-        { PrimitiveTypeName.INT64 },
-        { PrimitiveTypeName.INT96 },
-        { PrimitiveTypeName.FLOAT },
-        { PrimitiveTypeName.DOUBLE },
-        { PrimitiveTypeName.BINARY },
-        { PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY }
-    });
+    List<PrimitiveTypeName> types = Arrays.asList(
+        PrimitiveTypeName.BOOLEAN, PrimitiveTypeName.INT32, PrimitiveTypeName.INT64,
+        PrimitiveTypeName.INT96, PrimitiveTypeName.FLOAT, PrimitiveTypeName.DOUBLE,
+        PrimitiveTypeName.BINARY, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
+
+    List<CompressionCodecName> codecs;
+    String codecList = System.getenv("TEST_CODECS");
+    if (codecList != null) {
+      codecs = new ArrayList<CompressionCodecName>();
+      for (String codec : codecList.split(",")) {
+        codecs.add(CompressionCodecName.valueOf(codec.toUpperCase(Locale.ENGLISH)));
+      }
+    } else {
+      // otherwise test just UNCOMPRESSED
+      codecs = Arrays.asList(CompressionCodecName.UNCOMPRESSED);
+    }
+
+    System.err.println("Testing codecs: " + codecs);
+
+    List<Object[]> parameters = new ArrayList<Object[]>();
+    for (PrimitiveTypeName type : types) {
+      for (CompressionCodecName codec : codecs) {
+        parameters.add(new Object[] {type, codec});
+      }
+    }
+
+    return parameters;
   }
 
-  public FileEncodingsIT(PrimitiveTypeName typeName) {
+  public FileEncodingsIT(PrimitiveTypeName typeName, CompressionCodecName compression) {
     this.paramTypeName = typeName;
+    this.compression = compression;
   }
 
   @BeforeClass
@@ -118,8 +142,8 @@ public class FileEncodingsIT {
      * This loop will make sure to test future writer versions added to WriterVersion enum.
      */
     for (WriterVersion writerVersion : WriterVersion.values()) {
-      System.out.println(String.format("Testing %s/%s encodings using ROW_GROUP_SIZE=%d PAGE_SIZE=%d",
-          writerVersion.toString(), this.paramTypeName.toString(), TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE));
+      System.out.println(String.format("Testing %s/%s/%s encodings using ROW_GROUP_SIZE=%d
PAGE_SIZE=%d",
+          writerVersion, this.paramTypeName, this.compression, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE));
 
       Path parquetFile = createTempFile();
       writeValuesToFile(parquetFile, this.paramTypeName, randomValues, TEST_ROW_GROUP_SIZE,
TEST_PAGE_SIZE, DISABLE_DICTIONARY, writerVersion);
@@ -136,8 +160,8 @@ public class FileEncodingsIT {
      * This loop will make sure to test future writer versions added to WriterVersion enum.
      */
     for (WriterVersion writerVersion : WriterVersion.values()) {
-      System.out.println(String.format("Testing %s/%s + DICTIONARY encodings using ROW_GROUP_SIZE=%d
PAGE_SIZE=%d",
-          writerVersion.toString(), this.paramTypeName.toString(), TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE));
+      System.out.println(String.format("Testing %s/%s/%s + DICTIONARY encodings using ROW_GROUP_SIZE=%d
PAGE_SIZE=%d",
+          writerVersion, this.paramTypeName, this.compression, TEST_ROW_GROUP_SIZE, TEST_PAGE_SIZE));
 
       Path parquetFile = createTempFile();
       writeValuesToFile(parquetFile, this.paramTypeName, dictionaryValues, TEST_ROW_GROUP_SIZE,
TEST_PAGE_SIZE, ENABLE_DICTIONARY, writerVersion);
@@ -169,8 +193,15 @@ public class FileEncodingsIT {
     SimpleGroupFactory message = new SimpleGroupFactory(schema);
     GroupWriteSupport.setSchema(schema, configuration);
 
-    ParquetWriter<Group> writer = new ParquetWriter<Group>(file, new GroupWriteSupport(),
-        CompressionCodecName.UNCOMPRESSED, rowGroupSize, pageSize, TEST_DICT_PAGE_SIZE, enableDictionary,
false, version, configuration);
+    ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
+        .withCompressionCodec(compression)
+        .withRowGroupSize(rowGroupSize)
+        .withPageSize(pageSize)
+        .withDictionaryPageSize(TEST_DICT_PAGE_SIZE)
+        .withDictionaryEncoding(enableDictionary)
+        .withWriterVersion(version)
+        .withConf(configuration)
+        .build();
 
     for (Object o: values) {
       switch (type) {
@@ -303,7 +334,7 @@ public class FileEncodingsIT {
       for (PageReadStore pageReadStore : blockReaders) {
         for (ColumnDescriptor columnsDesc : fileSchema.getColumns()) {
           List<DataPage> pageGroup = getPageGroupForColumn(pageReadStore, columnsDesc);
-          DictionaryPage dictPage = getDictionaryPageForColumn(pageReadStore, columnsDesc);
+          DictionaryPage dictPage = reusableCopy(getDictionaryPageForColumn(pageReadStore,
columnsDesc));
 
           List<?> expectedRowGroupValues = expectedValues.subList(rowsRead, (int)(rowsRead
+ pageReadStore.getRowCount()));
           validateFirstToLast(rowGroupID, dictPage, pageGroup, columnsDesc, expectedRowGroupValues);
@@ -315,6 +346,49 @@ public class FileEncodingsIT {
       }
     }
 
+    private static DictionaryPage reusableCopy(DictionaryPage dict) {
+      if (dict == null) {
+        return null;
+      }
+      try {
+        return new DictionaryPage(
+            BytesInput.from(dict.getBytes().toByteArray()),
+            dict.getDictionarySize(), dict.getEncoding());
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Cannot read dictionary", e);
+      }
+    }
+
+    private static DataPage reusableCopy(DataPage page) {
+      return page.accept(new DataPage.Visitor<DataPage>() {
+        @Override
+        public DataPage visit(DataPageV1 data) {
+          try {
+            return new DataPageV1(BytesInput.from(data.getBytes().toByteArray()),
+                data.getValueCount(), data.getUncompressedSize(), data.getStatistics(),
+                data.getRlEncoding(), data.getDlEncoding(), data.getValueEncoding());
+          } catch (IOException e) {
+            throw new ParquetDecodingException("Cannot read data", e);
+          }
+        }
+
+        @Override
+        public DataPage visit(DataPageV2 data) {
+          try {
+            return new DataPageV2(data.getRowCount(), data.getNullCount(), data.getValueCount(),
+                BytesInput.from(data.getRepetitionLevels().toByteArray()),
+                BytesInput.from(data.getDefinitionLevels().toByteArray()),
+                data.getDataEncoding(),
+                BytesInput.from(data.getData().toByteArray()),
+                data.getUncompressedSize(), data.getStatistics(),
+                data.isCompressed());
+          } catch (IOException e) {
+            throw new ParquetDecodingException("Cannot read data", e);
+          }
+        }
+      });
+    }
+
     private static void validateFirstToLast(int rowGroupID, DictionaryPage dictPage, List<DataPage>
pageGroup, ColumnDescriptor desc, List<?> expectedValues) {
       int rowsRead = 0, pageID = 0;
       for (DataPage page : pageGroup) {
@@ -347,7 +421,7 @@ public class FileEncodingsIT {
 
       DataPage page;
       while ((page = pageReader.readPage()) != null) {
-        pageGroup.add(page);
+        pageGroup.add(reusableCopy(page));
       }
 
       return pageGroup;


Mime
View raw message