Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id DE93D200BAA for ; Thu, 13 Oct 2016 03:05:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DD309160AEE; Thu, 13 Oct 2016 01:05:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2F513160ACA for ; Thu, 13 Oct 2016 03:05:28 +0200 (CEST) Received: (qmail 41479 invoked by uid 500); 13 Oct 2016 01:05:27 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 41470 invoked by uid 99); 13 Oct 2016 01:05:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Oct 2016 01:05:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34EC3DFCF2; Thu, 13 Oct 2016 01:05:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: parquet-mr git commit: PARQUET-743: Fix DictionaryFilter when compressed dictionaries are reused. Date: Thu, 13 Oct 2016 01:05:27 +0000 (UTC) archived-at: Thu, 13 Oct 2016 01:05:29 -0000 Repository: parquet-mr Updated Branches: refs/heads/master de99127d7 -> 59ec4f018 PARQUET-743: Fix DictionaryFilter when compressed dictionaries are reused. BytesInput is not supposed to be held and reused, but decompressed dictionary pages do this. Reusing the dictionary will cause a failure, so the cleanest option is to keep the bytes around once the underlying stream has been read. Author: Ryan Blue Closes #376 from rdblue/PARQUET-743-fix-reused-dictionaries and squashes the following commits: 28c0903 [Ryan Blue] PARQUET-743: Fix DictionaryFilter when dictionaries are reused. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/59ec4f01 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/59ec4f01 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/59ec4f01 Branch: refs/heads/master Commit: 59ec4f018963eb55e32fafc2b924826c39c09682 Parents: de99127 Author: Ryan Blue Authored: Wed Oct 12 18:05:21 2016 -0700 Committer: Ryan Blue Committed: Wed Oct 12 18:05:21 2016 -0700 ---------------------------------------------------------------------- .../parquet/hadoop/DictionaryPageReader.java | 19 ++++++++++++++++++- .../dictionarylevel/DictionaryFilterTest.java | 4 ++-- 2 files changed, 20 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/59ec4f01/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java index 9a99358..2be7ffe 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.hadoop; import org.apache.parquet.Strings; +import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; @@ -93,7 +94,10 @@ class DictionaryPageReader implements DictionaryPageReadStore { // check the cache again in case this thread waited on another reading the same page if (!cache.containsKey(dotPath)) { DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null; - cache.put(dotPath, dict); + // copy the dictionary to ensure it can be reused if it is returned + // more than once. this can happen when a DictionaryFilter has two or + // more predicates for the same column. + cache.put(dotPath, reusableCopy(dict)); } } @@ -104,6 +108,19 @@ class DictionaryPageReader implements DictionaryPageReadStore { } } + private static DictionaryPage reusableCopy(DictionaryPage dict) { + if (dict == null) { + return null; + } + try { + return new DictionaryPage( + BytesInput.from(dict.getBytes().toByteArray()), + dict.getDictionarySize(), dict.getEncoding()); + } catch (IOException e) { + throw new ParquetDecodingException("Cannot read dictionary", e); + } + } + private boolean hasDictionaryPage(ColumnChunkMetaData column) { EncodingStats stats = column.getEncodingStats(); if (stats != null) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/59ec4f01/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 7af0c40..eca6332 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -56,7 +56,7 @@ import java.util.UUID; import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; import static org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop; import static org.apache.parquet.filter2.predicate.FilterApi.*; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -118,7 +118,7 @@ public class DictionaryFilterTest { SimpleGroupFactory f = new SimpleGroupFactory(schema); ParquetWriter writer = ExampleParquetWriter.builder(file) .withWriterVersion(PARQUET_1_0) - .withCompressionCodec(UNCOMPRESSED) + .withCompressionCodec(GZIP) .withRowGroupSize(1024*1024) .withPageSize(1024) .enableDictionaryEncoding()