Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3341617C02 for ; Mon, 3 Nov 2014 14:01:03 +0000 (UTC) Received: (qmail 44246 invoked by uid 500); 3 Nov 2014 14:01:03 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 44224 invoked by uid 500); 3 Nov 2014 14:01:03 -0000 Mailing-List: contact commits-help@parquet.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.incubator.apache.org Delivered-To: mailing list commits@parquet.incubator.apache.org Received: (qmail 44215 invoked by uid 99); 3 Nov 2014 14:01:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2014 14:01:03 +0000 X-ASF-Spam-Status: No, hits=-2000.6 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 03 Nov 2014 14:01:01 +0000 Received: (qmail 44030 invoked by uid 99); 3 Nov 2014 14:00:41 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2014 14:00:41 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id EF84F99B513; Mon, 3 Nov 2014 14:00:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tomwhite@apache.org To: commits@parquet.incubator.apache.org Message-Id: <2067a8c25a3b42fbb1eb31820f3e6f6f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: PARQUET-123: Enable dictionary support in AvroIndexedRecordConverter Date: Mon, 3 Nov 2014 14:00:40 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Repository: incubator-parquet-mr Updated Branches: refs/heads/master ccfca8f71 -> a29815abf PARQUET-123: Enable dictionary support in AvroIndexedRecordConverter If consumers are loading Parquet records into an immutable structure like an Apache Spark RDD, being able to configure string reuse in AvroIndexedRecordConverter can drastically reduce the overall memory footprint of strings. NOTE: This isn't meant to be a merge-able PR (yet). I want to use this PR as a way to discuss: (1) if this is a reasonable approach and (2) to learn if PrimitiveConverter needs to be thread-safe as I'm currently using a ConcurrentHashMap. If there's agreement that this would be worthwhile, I'll create a JIRA and write some unit tests. Author: Matt Massie Closes #76 from massie/immutable-strings and squashes the following commits: 88ce5bf [Matt Massie] PARQUET-123: Enable dictionary support in AvroIndexedRecordConverter Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/a29815ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/a29815ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/a29815ab Branch: refs/heads/master Commit: a29815abf4f0e51b332a8af1b83ad344104c14d9 Parents: ccfca8f Author: Matt Massie Authored: Mon Nov 3 14:00:33 2014 +0000 Committer: Tom White Committed: Mon Nov 3 14:00:33 2014 +0000 ---------------------------------------------------------------------- .../avro/AvroIndexedRecordConverter.java | 26 ++++++++++++++++++-- .../avro/TestSpecificInputOutputFormat.java | 11 +++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/a29815ab/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java index e235741..870c6f0 100644 --- a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java +++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java @@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; import parquet.Preconditions; +import parquet.column.Dictionary; import parquet.io.InvalidRecordException; import parquet.io.api.Binary; import parquet.io.api.Converter; @@ -32,6 +33,7 @@ import parquet.io.api.GroupConverter; import parquet.io.api.PrimitiveConverter; import parquet.schema.GroupType; import parquet.schema.MessageType; +import parquet.schema.OriginalType; import parquet.schema.Type; class AvroIndexedRecordConverter extends GroupConverter { @@ -119,7 +121,7 @@ class AvroIndexedRecordConverter extends GroupConverter } else if (schema.getType().equals(Schema.Type.BYTES)) { return new FieldBytesConverter(parent); } else if (schema.getType().equals(Schema.Type.STRING)) { - return new FieldStringConverter(parent); + return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8); } else if (schema.getType().equals(Schema.Type.RECORD)) { return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema); } else if (schema.getType().equals(Schema.Type.ENUM)) { @@ -320,9 +322,12 @@ class AvroIndexedRecordConverter extends GroupConverter static final class FieldStringConverter extends PrimitiveConverter { private final ParentValueContainer parent; + private final boolean dictionarySupport; + private String[] dict; - public FieldStringConverter(ParentValueContainer parent) { + public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) { this.parent = parent; + this.dictionarySupport = dictionarySupport; } @Override @@ -330,6 +335,23 @@ class AvroIndexedRecordConverter extends GroupConverter parent.add(value.toStringUsingUTF8()); } + @Override + public boolean hasDictionarySupport() { + return dictionarySupport; + } + + @Override + public void setDictionary(Dictionary dictionary) { + dict = new String[dictionary.getMaxId() + 1]; + for (int i = 0; i <= dictionary.getMaxId(); i++) { + dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8(); + } + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + parent.add(dict[dictionaryId]); + } } static final class FieldEnumConverter extends PrimitiveConverter { http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/a29815ab/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java index 60ea2e4..b03a6c8 100644 --- a/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java +++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java @@ -18,6 +18,7 @@ package parquet.avro; import static java.lang.Thread.sleep; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.collect.Lists; @@ -50,11 +51,11 @@ public class TestSpecificInputOutputFormat { Car.Builder carBuilder = Car.newBuilder() .setDoors(2) .setMake("Tesla") - .setModel("Model X") + .setModel(String.format("Model X v%d", i % 2)) .setVin(new Vin(vin.getBytes())) .setYear(2014 + i) .setOptionalExtra(LeatherTrim.newBuilder().setColour("black").build()) - .setRegistration("Calfornia"); + .setRegistration("California"); Engine.Builder engineBuilder = Engine.newBuilder() .setCapacity(85.0f) .setHasTurboCharger(false); @@ -186,8 +187,13 @@ public class TestSpecificInputOutputFormat { "part-m-00000.parquet"); final AvroParquetReader out = new AvroParquetReader(mapperOutput); Car car; + Car previousCar = null; int lineNumber = 0; while ((car = out.read()) != null) { + if (previousCar != null) { + // Testing reference equality here. The "model" field should be dictionary-encoded. + assertTrue(car.getModel() == previousCar.getModel()); + } // Make sure that predicate push down worked as expected if (car.getEngine().getType() == EngineType.PETROL) { fail("UnboundRecordFilter failed to remove cars with PETROL engines"); @@ -199,6 +205,7 @@ public class TestSpecificInputOutputFormat { expectedCar.setOptionalExtra(null); assertEquals("line " + lineNumber, expectedCar, car); ++lineNumber; + previousCar = car; } out.close(); }