Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 50F82200C0F for ; Thu, 19 Jan 2017 02:27:16 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4F753160B56; Thu, 19 Jan 2017 01:27:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 18602160B5C for ; Thu, 19 Jan 2017 02:27:13 +0100 (CET) Received: (qmail 52672 invoked by uid 500); 19 Jan 2017 01:27:13 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 52425 invoked by uid 99); 19 Jan 2017 01:27:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Jan 2017 01:27:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CF322DFA43; Thu, 19 Jan 2017 01:27:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Date: Thu, 19 Jan 2017 01:27:18 -0000 Message-Id: <989a8010052d413da138332d3b328fba@git.apache.org> In-Reply-To: <25406da3dfe343a9a44d6bc62fd223d9@git.apache.org> References: <25406da3dfe343a9a44d6bc62fd223d9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/50] [abbrv] parquet-mr git commit: PARQUET-548: Add EncodingStats. archived-at: Thu, 19 Jan 2017 01:27:16 -0000 PARQUET-548: Add EncodingStats. This adds `EncodingStats`, which tracks the number of pages for each encoding, separated into dictionary and data pages. It also adds convenience functions that are useful for dictionary filtering, like `hasDictionaryEncodedPages` and `hasNonDictionaryEncodedPages`. `EncodingStats` have a unit test in parquet-column and an integration test in parquet-hadoop that writes a file and verifies the stats are present and correct when it is read. This includes commits from #330 because it updates the dictionary filter. I'll rebase and remove them once it is merged. Author: Ryan Blue Closes #332 from rdblue/PARQUET-548-add-encoding-stats and squashes the following commits: 5f148e6 [Ryan Blue] PARQUET-548: Fixes for review comments. dc332d3 [Ryan Blue] PARQUET-548: Add EncodingStats. Conflicts: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java Resolution: Minor formatting changes conflicted with wrapping encodings in a HashSet. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/d901cf9b Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/d901cf9b Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/d901cf9b Branch: refs/heads/parquet-1.8.x Commit: d901cf9b67fa718d948053749df55fab5740311e Parents: 7885f9f Author: Ryan Blue Authored: Fri Apr 22 17:39:52 2016 -0700 Committer: Ryan Blue Committed: Mon Jan 9 16:54:53 2017 -0800 ---------------------------------------------------------------------- .../apache/parquet/column/EncodingStats.java | 162 +++++++++++++++ .../parquet/column/TestEncodingStats.java | 202 +++++++++++++++++++ .../dictionarylevel/DictionaryFilter.java | 6 + .../converter/ParquetMetadataConverter.java | 50 +++++ .../hadoop/ColumnChunkPageWriteStore.java | 33 +-- .../parquet/hadoop/DictionaryPageReader.java | 7 + .../parquet/hadoop/ParquetFileWriter.java | 31 ++- .../hadoop/metadata/ColumnChunkMetaData.java | 67 +++--- .../hadoop/TestReadWriteEncodingStats.java | 121 +++++++++++ 9 files changed, 632 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java b/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java new file mode 100644 index 0000000..a8b95f8 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/EncodingStats.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY; +import static org.apache.parquet.column.Encoding.RLE_DICTIONARY; + +/** + * EncodingStats track dictionary and data page encodings for a single column within a row group. + * These are used when filtering row groups. For example, to filter a row group based on a column's + * dictionary, all of the data pages in that column must be dictionary-encoded. This class provides + * convenience methods for those checks, like {@link #hasNonDictionaryEncodedPages()}. + */ +public class EncodingStats { + final Map dictStats; + final Map dataStats; + private final boolean usesV2Pages; + + private EncodingStats(Map dictStats, + Map dataStats, + boolean usesV2Pages) { + this.dictStats = dictStats; + this.dataStats = dataStats; + this.usesV2Pages = usesV2Pages; + } + + public Set getDictionaryEncodings() { + return dictStats.keySet(); + } + + public Set getDataEncodings() { + return dataStats.keySet(); + } + + public int getNumDictionaryPagesEncodedAs(Encoding enc) { + if (dictStats.containsKey(enc)) { + return dictStats.get(enc); + } else { + return 0; + } + } + + public int getNumDataPagesEncodedAs(Encoding enc) { + if (dataStats.containsKey(enc)) { + return dataStats.get(enc); + } else { + return 0; + } + } + + public boolean hasDictionaryPages() { + return !dictStats.isEmpty(); + } + + public boolean hasDictionaryEncodedPages() { + Set encodings = dataStats.keySet(); + return (encodings.contains(RLE_DICTIONARY) || encodings.contains(PLAIN_DICTIONARY)); + } + + public boolean hasNonDictionaryEncodedPages() { + if (dataStats.isEmpty()) { + return false; // no pages + } + + // this modifies the set, so copy it + Set encodings = new HashSet(dataStats.keySet()); + if (!encodings.remove(RLE_DICTIONARY) && + !encodings.remove(PLAIN_DICTIONARY)) { + return true; // not dictionary encoded + } + + if (encodings.isEmpty()) { + return false; + } + + // at least one non-dictionary encoding is present + return true; + } + + public boolean usesV2Pages() { + return usesV2Pages; + } + + /** + * Used to build {@link EncodingStats} from metadata or to accumulate stats as pages are written. + */ + public static class Builder { + private final Map dictStats = new LinkedHashMap(); + private final Map dataStats = new LinkedHashMap(); + private boolean usesV2Pages = false; + + public Builder clear() { + this.usesV2Pages = false; + dictStats.clear(); + dataStats.clear(); + return this; + } + + public Builder withV2Pages() { + this.usesV2Pages = true; + return this; + } + + public Builder addDictEncoding(Encoding encoding) { + return addDictEncoding(encoding, 1); + } + + public Builder addDictEncoding(Encoding encoding, int numPages) { + Integer pages = dictStats.get(encoding); + dictStats.put(encoding, numPages + (pages != null ? pages : 0)); + return this; + } + + public Builder addDataEncodings(Collection encodings) { + for (Encoding encoding : encodings) { + addDataEncoding(encoding); + } + return this; + } + + public Builder addDataEncoding(Encoding encoding) { + return addDataEncoding(encoding, 1); + } + + public Builder addDataEncoding(Encoding encoding, int numPages) { + Integer pages = dataStats.get(encoding); + dataStats.put(encoding, numPages + (pages != null ? pages : 0)); + return this; + } + + public EncodingStats build() { + return new EncodingStats( + Collections.unmodifiableMap(new LinkedHashMap(dictStats)), + Collections.unmodifiableMap(new LinkedHashMap(dataStats)), + usesV2Pages); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-column/src/test/java/org/apache/parquet/column/TestEncodingStats.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/test/java/org/apache/parquet/column/TestEncodingStats.java b/parquet-column/src/test/java/org/apache/parquet/column/TestEncodingStats.java new file mode 100644 index 0000000..4c46688 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/TestEncodingStats.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column; + +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestEncodingStats { + @Test + public void testReusedBuilder() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.withV2Pages(); + builder.addDictEncoding(Encoding.PLAIN); + builder.addDataEncoding(Encoding.RLE_DICTIONARY, 3); + builder.addDataEncoding(Encoding.DELTA_BYTE_ARRAY); + builder.addDataEncoding(Encoding.DELTA_BYTE_ARRAY); + EncodingStats stats1 = builder.build(); + + Map expectedDictStats1 = new HashMap(); + expectedDictStats1.put(Encoding.PLAIN, 1); + Map expectedDataStats1 = new HashMap(); + expectedDataStats1.put(Encoding.RLE_DICTIONARY, 3); + expectedDataStats1.put(Encoding.DELTA_BYTE_ARRAY, 2); + + builder.clear(); + builder.addDataEncoding(Encoding.PLAIN); + builder.addDataEncoding(Encoding.PLAIN); + builder.addDataEncoding(Encoding.PLAIN); + builder.addDataEncoding(Encoding.PLAIN); + EncodingStats stats2 = builder.build(); + + Map expectedDictStats2 = new HashMap(); + Map expectedDataStats2 = new HashMap(); + expectedDataStats2.put(Encoding.PLAIN, 4); + + assertEquals("Dictionary stats should be correct", expectedDictStats2, stats2.dictStats); + assertEquals("Data stats should be correct", expectedDataStats2, stats2.dataStats); + + assertEquals("Dictionary stats should be correct after reuse", expectedDictStats1, stats1.dictStats); + assertEquals("Data stats should be correct after reuse", expectedDataStats1, stats1.dataStats); + } + + @Test + public void testNoPages() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + EncodingStats stats = builder.build(); + + assertFalse(stats.usesV2Pages()); + assertFalse("Should not have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertFalse("Should not have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertFalse("Should not have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testNoDataPages() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.addDictEncoding(Encoding.PLAIN_DICTIONARY); + EncodingStats stats = builder.build(); + + assertFalse(stats.usesV2Pages()); + assertFalse("Should not have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertFalse("Should not have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertTrue("Should have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testV1AllDictionary() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.addDictEncoding(Encoding.PLAIN_DICTIONARY); + builder.addDataEncoding(Encoding.PLAIN_DICTIONARY); + builder.addDataEncoding(Encoding.PLAIN_DICTIONARY); + EncodingStats stats = builder.build(); + + assertFalse(stats.usesV2Pages()); + assertTrue("Should have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertFalse("Should not have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertTrue("Should have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testV1NoDictionary() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.addDataEncoding(Encoding.PLAIN); + EncodingStats stats = builder.build(); + + assertFalse(stats.usesV2Pages()); + assertFalse("Should not have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertTrue("Should have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertFalse("Should not have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testV1Fallback() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.addDictEncoding(Encoding.PLAIN_DICTIONARY); + builder.addDataEncoding(Encoding.PLAIN_DICTIONARY); + builder.addDataEncoding(Encoding.PLAIN_DICTIONARY); + builder.addDataEncoding(Encoding.PLAIN); + EncodingStats stats = builder.build(); + + assertFalse(stats.usesV2Pages()); + assertTrue("Should have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertTrue("Should have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertTrue("Should have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testV2AllDictionary() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.withV2Pages(); + builder.addDictEncoding(Encoding.PLAIN); + builder.addDataEncoding(Encoding.RLE_DICTIONARY); + EncodingStats stats = builder.build(); + + assertTrue(stats.usesV2Pages()); + assertTrue("Should have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertFalse("Should not have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertTrue("Should have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testV2NoDictionary() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.withV2Pages(); + builder.addDataEncoding(Encoding.DELTA_BINARY_PACKED); + builder.addDataEncoding(Encoding.DELTA_BINARY_PACKED); + EncodingStats stats = builder.build(); + + assertTrue(stats.usesV2Pages()); + assertFalse("Should not have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertTrue("Should have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertFalse("Should not have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testV2Fallback() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.withV2Pages(); + builder.addDictEncoding(Encoding.PLAIN); + builder.addDataEncoding(Encoding.RLE_DICTIONARY); + builder.addDataEncoding(Encoding.DELTA_BYTE_ARRAY); + builder.addDataEncoding(Encoding.DELTA_BYTE_ARRAY); + EncodingStats stats = builder.build(); + + assertTrue(stats.usesV2Pages()); + assertTrue("Should have dictionary-encoded pages", stats.hasDictionaryEncodedPages()); + assertTrue("Should have non-dictionary pages", stats.hasNonDictionaryEncodedPages()); + assertTrue("Should have dictionary pages", stats.hasDictionaryPages()); + } + + @Test + public void testCounts() { + EncodingStats.Builder builder = new EncodingStats.Builder(); + builder.withV2Pages(); + builder.addDictEncoding(Encoding.PLAIN); + builder.addDataEncoding(Encoding.RLE_DICTIONARY, 4); + builder.addDataEncoding(Encoding.RLE_DICTIONARY); + builder.addDataEncoding(Encoding.DELTA_BYTE_ARRAY); + builder.addDataEncoding(Encoding.DELTA_BYTE_ARRAY); + EncodingStats stats = builder.build(); + + assertEquals("Count should match", 1, stats.getNumDictionaryPagesEncodedAs(Encoding.PLAIN)); + assertEquals("Count should match", 0, stats.getNumDictionaryPagesEncodedAs(Encoding.PLAIN_DICTIONARY)); + assertEquals("Count should match", 0, stats.getNumDictionaryPagesEncodedAs(Encoding.RLE)); + assertEquals("Count should match", 0, stats.getNumDictionaryPagesEncodedAs(Encoding.BIT_PACKED)); + assertEquals("Count should match", 0, stats.getNumDictionaryPagesEncodedAs(Encoding.DELTA_BYTE_ARRAY)); + assertEquals("Count should match", 0, stats.getNumDictionaryPagesEncodedAs(Encoding.DELTA_BINARY_PACKED)); + assertEquals("Count should match", 0, stats.getNumDictionaryPagesEncodedAs(Encoding.DELTA_LENGTH_BYTE_ARRAY)); + + assertEquals("Count should match", 5, stats.getNumDataPagesEncodedAs(Encoding.RLE_DICTIONARY)); + assertEquals("Count should match", 2, stats.getNumDataPagesEncodedAs(Encoding.DELTA_BYTE_ARRAY)); + assertEquals("Count should match", 0, stats.getNumDataPagesEncodedAs(Encoding.RLE)); + assertEquals("Count should match", 0, stats.getNumDataPagesEncodedAs(Encoding.BIT_PACKED)); + assertEquals("Count should match", 0, stats.getNumDataPagesEncodedAs(Encoding.PLAIN)); + assertEquals("Count should match", 0, stats.getNumDataPagesEncodedAs(Encoding.PLAIN_DICTIONARY)); + assertEquals("Count should match", 0, stats.getNumDataPagesEncodedAs(Encoding.DELTA_BINARY_PACKED)); + assertEquals("Count should match", 0, stats.getNumDataPagesEncodedAs(Encoding.DELTA_LENGTH_BYTE_ARRAY)); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java index 6235c20..9b03f82 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java @@ -22,6 +22,7 @@ import org.apache.parquet.Log; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.filter2.predicate.FilterPredicate; @@ -329,6 +330,11 @@ public class DictionaryFilter implements FilterPredicate.Visitor { @SuppressWarnings("deprecation") private static boolean hasNonDictionaryPages(ColumnChunkMetaData meta) { + EncodingStats stats = meta.getEncodingStats(); + if (stats != null) { + return stats.hasNonDictionaryEncodedPages(); + } + // without EncodingStats, fall back to testing the encoding list Set encodings = new HashSet(meta.getEncodings()); if (encodings.remove(Encoding.PLAIN_DICTIONARY)) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 48f295e..6feb4a2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -38,6 +38,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.Log; +import org.apache.parquet.format.PageEncodingStats; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.ColumnChunk; import org.apache.parquet.format.ColumnMetaData; @@ -58,6 +59,7 @@ import org.apache.parquet.format.Type; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.schema.GroupType; @@ -183,6 +185,9 @@ public class ParquetMetadataConverter { if (!columnMetaData.getStatistics().isEmpty()) { columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics())); } + if (columnMetaData.getEncodingStats() != null) { + columnChunk.meta_data.setEncoding_stats(convertEncodingStats(columnMetaData.getEncodingStats())); + } // columnChunk.meta_data.index_page_offset = ; // columnChunk.meta_data.key_value_metadata = ; // nothing yet @@ -232,6 +237,50 @@ public class ParquetMetadataConverter { return Encoding.valueOf(encoding.name()); } + public EncodingStats convertEncodingStats(List stats) { + if (stats == null) { + return null; + } + + EncodingStats.Builder builder = new EncodingStats.Builder(); + for (PageEncodingStats stat : stats) { + switch (stat.getPage_type()) { + case DATA_PAGE_V2: + builder.withV2Pages(); + // falls through + case DATA_PAGE: + builder.addDataEncoding( + getEncoding(stat.getEncoding()), stat.getCount()); + break; + case DICTIONARY_PAGE: + builder.addDictEncoding( + getEncoding(stat.getEncoding()), stat.getCount()); + break; + } + } + return builder.build(); + } + + public List convertEncodingStats(EncodingStats stats) { + if (stats == null) { + return null; + } + + List formatStats = new ArrayList(); + for (org.apache.parquet.column.Encoding encoding : stats.getDictionaryEncodings()) { + formatStats.add(new PageEncodingStats( + PageType.DICTIONARY_PAGE, getEncoding(encoding), + stats.getNumDictionaryPagesEncodedAs(encoding))); + } + PageType dataPageType = (stats.usesV2Pages() ? PageType.DATA_PAGE_V2 : PageType.DATA_PAGE); + for (org.apache.parquet.column.Encoding encoding : stats.getDataEncodings()) { + formatStats.add(new PageEncodingStats( + dataPageType, getEncoding(encoding), + stats.getNumDataPagesEncodedAs(encoding))); + } + return formatStats; + } + public static Statistics toParquetStatistics( org.apache.parquet.column.statistics.Statistics statistics) { Statistics stats = new Statistics(); @@ -613,6 +662,7 @@ public class ParquetMetadataConverter { path, messageType.getType(path.toArray()).asPrimitiveType().getPrimitiveTypeName(), CompressionCodecName.fromParquet(metaData.codec), + convertEncodingStats(metaData.getEncoding_stats()), fromFormatEncodings(metaData.encodings), fromParquetStatistics( parquetMetadata.getCreated_by(), http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 72a04d5..7f20f52 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; @@ -62,7 +63,10 @@ class ColumnChunkPageWriteStore implements PageWriteStore { private long totalValueCount; private int pageCount; - private Set encodings = new HashSet(); + // repetition and definition level encodings are used only for v1 pages and don't change + private Set rlEncodings = new HashSet(); + private Set dlEncodings = new HashSet(); + private List dataEncodings = new ArrayList(); private Statistics totalStatistics; @@ -111,9 +115,9 @@ class ColumnChunkPageWriteStore implements PageWriteStore { // by concatenating before collecting instead of collecting twice, // we only allocate one buffer to copy into instead of multiple. buf.collect(BytesInput.concat(BytesInput.from(tempOutputStream), compressedBytes)); - encodings.add(rlEncoding); - encodings.add(dlEncoding); - encodings.add(valuesEncoding); + rlEncodings.add(rlEncoding); + dlEncodings.add(dlEncoding); + dataEncodings.add(valuesEncoding); } @Override @@ -156,7 +160,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore { definitionLevels, compressedData) ); - encodings.add(dataEncoding); + dataEncodings.add(dataEncoding); } private int toIntWithCheck(long size) { @@ -177,21 +181,24 @@ class ColumnChunkPageWriteStore implements PageWriteStore { writer.startColumn(path, totalValueCount, compressor.getCodecName()); if (dictionaryPage != null) { writer.writeDictionaryPage(dictionaryPage); - encodings.add(dictionaryPage.getEncoding()); + // tracking the dictionary encoding is handled in writeDictionaryPage } - writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, new ArrayList(encodings)); + writer.writeDataPages(buf, uncompressedLength, compressedLength, totalStatistics, + rlEncodings, dlEncodings, dataEncodings); writer.endColumn(); if (INFO) { LOG.info( String.format( "written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s", - buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, encodings) - + (dictionaryPage != null ? String.format( - ", dic { %,d entries, %,dB raw, %,dB comp}", - dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) - : "")); + buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet(dataEncodings)) + + (dictionaryPage != null ? String.format( + ", dic { %,d entries, %,dB raw, %,dB comp}", + dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize()) + : "")); } - encodings.clear(); + rlEncodings.clear(); + dlEncodings.clear(); + dataEncodings.clear(); pageCount = 0; } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java index cb0d5e7..9a99358 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java @@ -21,6 +21,7 @@ package org.apache.parquet.hadoop; import org.apache.parquet.Strings; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.page.DictionaryPageReadStore; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -104,6 +105,12 @@ class DictionaryPageReader implements DictionaryPageReadStore { } private boolean hasDictionaryPage(ColumnChunkMetaData column) { + EncodingStats stats = column.getEncodingStats(); + if (stats != null) { + // ensure there is a dictionary page and that it is used to encode data pages + return stats.hasDictionaryPages() && stats.hasDictionaryEncodedPages(); + } + Set encodings = column.getEncodings(); return (encodings.contains(PLAIN_DICTIONARY) || encodings.contains(RLE_DICTIONARY)); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 9279a6c..5dbc016 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -48,6 +48,7 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.metadata.ColumnPath; @@ -115,6 +116,7 @@ public class ParquetFileWriter { private long currentRecordCount; // set in startBlock // column chunk data accumulated as pages are written + private EncodingStats.Builder encodingStatsBuilder; private Set currentEncodings; private long uncompressedLength; private long compressedLength; @@ -236,6 +238,8 @@ public class ParquetFileWriter { this.alignment = NoAlignment.get(rowGroupSize); this.out = fs.create(file, overwriteFlag); } + + this.encodingStatsBuilder = new EncodingStats.Builder(); } /** @@ -256,6 +260,7 @@ public class ParquetFileWriter { rowAndBlockSize, rowAndBlockSize, maxPaddingSize); this.out = fs.create(file, true, DFS_BUFFER_SIZE_DEFAULT, fs.getDefaultReplication(file), rowAndBlockSize); + this.encodingStatsBuilder = new EncodingStats.Builder(); } /** @@ -295,6 +300,7 @@ public class ParquetFileWriter { long valueCount, CompressionCodecName compressionCodecName) throws IOException { state = state.startColumn(); + encodingStatsBuilder.clear(); currentEncodings = new HashSet(); currentChunkPath = ColumnPath.get(descriptor.getPath()); currentChunkType = descriptor.getType(); @@ -329,6 +335,7 @@ public class ParquetFileWriter { this.compressedLength += compressedPageSize + headerSize; if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize); dictionaryPage.getBytes().writeAllTo(out); + encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); currentEncodings.add(dictionaryPage.getEncoding()); } @@ -365,6 +372,7 @@ public class ParquetFileWriter { this.compressedLength += compressedPageSize + headerSize; if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncoding(valuesEncoding); currentEncodings.add(rlEncoding); currentEncodings.add(dlEncoding); currentEncodings.add(valuesEncoding); @@ -404,6 +412,7 @@ public class ParquetFileWriter { if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize); bytes.writeAllTo(out); currentStatistics.mergeStatistics(statistics); + encodingStatsBuilder.addDataEncoding(valuesEncoding); currentEncodings.add(rlEncoding); currentEncodings.add(dlEncoding); currentEncodings.add(valuesEncoding); @@ -416,11 +425,13 @@ public class ParquetFileWriter { * @param compressedTotalPageSize total compressed size (without page headers) * @throws IOException */ - void writeDataPages(BytesInput bytes, - long uncompressedTotalPageSize, - long compressedTotalPageSize, - Statistics totalStats, - List encodings) throws IOException { + void writeDataPages(BytesInput bytes, + long uncompressedTotalPageSize, + long compressedTotalPageSize, + Statistics totalStats, + Set rlEncodings, + Set dlEncodings, + List dataEncodings) throws IOException { state = state.write(); if (DEBUG) LOG.debug(out.getPos() + ": write data pages"); long headersSize = bytes.size() - compressedTotalPageSize; @@ -428,7 +439,13 @@ public class ParquetFileWriter { this.compressedLength += compressedTotalPageSize + headersSize; if (DEBUG) LOG.debug(out.getPos() + ": write data pages content"); bytes.writeAllTo(out); - currentEncodings.addAll(encodings); + encodingStatsBuilder.addDataEncodings(dataEncodings); + if (rlEncodings.isEmpty()) { + encodingStatsBuilder.withV2Pages(); + } + currentEncodings.addAll(rlEncodings); + currentEncodings.addAll(dlEncodings); + currentEncodings.addAll(dataEncodings); currentStatistics = totalStats; } @@ -443,6 +460,7 @@ public class ParquetFileWriter { currentChunkPath, currentChunkType, currentChunkCodec, + encodingStatsBuilder.build(), currentEncodings, currentStatistics, currentChunkFirstDataPage, @@ -540,6 +558,7 @@ public class ParquetFileWriter { chunk.getPath(), chunk.getType(), chunk.getCodec(), + chunk.getEncodingStats(), chunk.getEncodings(), chunk.getStatistics(), newChunkStart, http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java index 0c2fd4d..720bd77 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java @@ -21,6 +21,7 @@ package org.apache.parquet.hadoop.metadata; import java.util.Set; import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.EncodingStats; import org.apache.parquet.column.statistics.BooleanStatistics; import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; @@ -42,37 +43,33 @@ abstract public class ColumnChunkMetaData { long valueCount, long totalSize, long totalUncompressedSize) { - // to save space we store those always positive longs in ints when they fit. - if (positiveLongFitsInAnInt(firstDataPage) - && positiveLongFitsInAnInt(dictionaryPageOffset) - && positiveLongFitsInAnInt(valueCount) - && positiveLongFitsInAnInt(totalSize) - && positiveLongFitsInAnInt(totalUncompressedSize)) { - return new IntColumnChunkMetaData( - path, type, codec, encodings, - new BooleanStatistics(), - firstDataPage, - dictionaryPageOffset, - valueCount, - totalSize, - totalUncompressedSize); - } else { - return new LongColumnChunkMetaData( - path, type, codec, encodings, - new BooleanStatistics(), - firstDataPage, - dictionaryPageOffset, - valueCount, - totalSize, - totalUncompressedSize); - } + return get( + path, type, codec, null, encodings, new BooleanStatistics(), firstDataPage, + dictionaryPageOffset, valueCount, totalSize, totalUncompressedSize); } + @Deprecated + public static ColumnChunkMetaData get( + ColumnPath path, + PrimitiveTypeName type, + CompressionCodecName codec, + Set encodings, + Statistics statistics, + long firstDataPage, + long dictionaryPageOffset, + long valueCount, + long totalSize, + long totalUncompressedSize) { + return get( + path, type, codec, null, encodings, statistics, firstDataPage, dictionaryPageOffset, + valueCount, totalSize, totalUncompressedSize); + } public static ColumnChunkMetaData get( ColumnPath path, PrimitiveTypeName type, CompressionCodecName codec, + EncodingStats encodingStats, Set encodings, Statistics statistics, long firstDataPage, @@ -87,7 +84,8 @@ abstract public class ColumnChunkMetaData { && positiveLongFitsInAnInt(totalSize) && positiveLongFitsInAnInt(totalUncompressedSize)) { return new IntColumnChunkMetaData( - path, type, codec, encodings, + path, type, codec, + encodingStats, encodings, statistics, firstDataPage, dictionaryPageOffset, @@ -96,7 +94,8 @@ abstract public class ColumnChunkMetaData { totalUncompressedSize); } else { return new LongColumnChunkMetaData( - path, type, codec, encodings, + path, type, codec, + encodingStats, encodings, statistics, firstDataPage, dictionaryPageOffset, @@ -129,10 +128,17 @@ abstract public class ColumnChunkMetaData { return (value >= 0) && (value + Integer.MIN_VALUE <= Integer.MAX_VALUE); } + private final EncodingStats encodingStats; + // we save 3 references by storing together the column properties that have few distinct values private final ColumnChunkProperties properties; protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) { + this(null, columnChunkProperties); + } + + protected ColumnChunkMetaData(EncodingStats encodingStats, ColumnChunkProperties columnChunkProperties) { + this.encodingStats = encodingStats; this.properties = columnChunkProperties; } @@ -192,6 +198,9 @@ abstract public class ColumnChunkMetaData { return properties.getEncodings(); } + public EncodingStats getEncodingStats() { + return encodingStats; + } @Override public String toString() { @@ -224,6 +233,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { ColumnPath path, PrimitiveTypeName type, CompressionCodecName codec, + EncodingStats encodingStats, Set encodings, Statistics statistics, long firstDataPage, @@ -231,7 +241,7 @@ class IntColumnChunkMetaData extends ColumnChunkMetaData { long valueCount, long totalSize, long totalUncompressedSize) { - super(ColumnChunkProperties.get(path, type, codec, encodings)); + super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings)); this.firstDataPage = positiveLongToInt(firstDataPage); this.dictionaryPageOffset = positiveLongToInt(dictionaryPageOffset); this.valueCount = positiveLongToInt(valueCount); @@ -328,6 +338,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { ColumnPath path, PrimitiveTypeName type, CompressionCodecName codec, + EncodingStats encodingStats, Set encodings, Statistics statistics, long firstDataPageOffset, @@ -335,7 +346,7 @@ class LongColumnChunkMetaData extends ColumnChunkMetaData { long valueCount, long totalSize, long totalUncompressedSize) { - super(ColumnChunkProperties.get(path, type, codec, encodings)); + super(encodingStats, ColumnChunkProperties.get(path, type, codec, encodings)); this.firstDataPageOffset = firstDataPageOffset; this.dictionaryPageOffset = dictionaryPageOffset; this.valueCount = valueCount; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/d901cf9b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java new file mode 100644 index 0000000..69e11c1 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.EncodingStats; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.schema.MessageType; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; + +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests that files are written with EncodingStats, the stats are readable, and generally correct. + */ +public class TestReadWriteEncodingStats { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static final Configuration CONF = new Configuration(); + private static final int NUM_RECORDS = 1000; + private static final MessageType SCHEMA = parseMessageType( + "message test { " + + "required binary dict_binary_field; " + + "required int32 plain_int32_field; " + + "required binary fallback_binary_field; " + + "} "); + + private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz"; + + private static void writeData(ParquetWriter writer) throws IOException { + SimpleGroupFactory f = new SimpleGroupFactory(SCHEMA); + for (int i = 0; i < NUM_RECORDS; i += 1) { + int index = i % ALPHABET.length(); + + Group group = f.newGroup() + .append("dict_binary_field", ALPHABET.substring(index, index+1)) + .append("plain_int32_field", i) + .append("fallback_binary_field", i < (NUM_RECORDS / 2) ? + ALPHABET.substring(index, index+1) : UUID.randomUUID().toString()); + + writer.write(group); + } + } + @Test + public void testReadWrite() throws Exception { + File file = temp.newFile("encoding-stats.parquet"); + assertTrue(file.delete()); + Path path = new Path(file.toString()); + + ParquetWriter writer = ExampleParquetWriter.builder(path) + .withWriterVersion(PARQUET_1_0) + .withPageSize(1024) // ensure multiple pages are written + .enableDictionaryEncoding() + .withDictionaryPageSize(2*1024) + .withConf(CONF) + .withType(SCHEMA) + .build(); + writeData(writer); + writer.close(); + + ParquetFileReader reader = ParquetFileReader.open(CONF, path); + assertEquals("Should have one row group", 1, reader.getRowGroups().size()); + BlockMetaData rowGroup = reader.getRowGroups().get(0); + + ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0); + EncodingStats dictStats = dictColumn.getEncodingStats(); + assertNotNull("Dict column should have non-null encoding stats", dictStats); + assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages()); + assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages()); + assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages()); + + ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1); + EncodingStats plainStats = plainColumn.getEncodingStats(); + assertNotNull("Plain column should have non-null encoding stats", plainStats); + assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages()); + assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages()); + assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages()); + + ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2); + EncodingStats fallbackStats = fallbackColumn.getEncodingStats(); + assertNotNull("Fallback column should have non-null encoding stats", fallbackStats); + assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages()); + assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages()); + assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages()); + } +}