parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject git commit: PARQUET-123: Enable dictionary support in AvroIndexedRecordConverter
Date Mon, 03 Nov 2014 14:00:40 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master ccfca8f71 -> a29815abf


PARQUET-123: Enable dictionary support in AvroIndexedRecordConverter

If consumers are loading Parquet records into an immutable structure
like an Apache Spark RDD, being able to configure string reuse in
AvroIndexedRecordConverter can drastically reduce the overall memory
footprint of strings.

NOTE: This isn't meant to be a merge-able PR (yet). I want to use
this PR as a way to discuss: (1) if this is a reasonable approach
and (2) to learn if PrimitiveConverter needs to be thread-safe as
I'm currently using a ConcurrentHashMap. If there's agreement
that this would be worthwhile, I'll create a JIRA and write some
unit tests.

Author: Matt Massie <massie@cs.berkeley.edu>

Closes #76 from massie/immutable-strings and squashes the following commits:

88ce5bf [Matt Massie] PARQUET-123: Enable dictionary support in AvroIndexedRecordConverter


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/a29815ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/a29815ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/a29815ab

Branch: refs/heads/master
Commit: a29815abf4f0e51b332a8af1b83ad344104c14d9
Parents: ccfca8f
Author: Matt Massie <massie@cs.berkeley.edu>
Authored: Mon Nov 3 14:00:33 2014 +0000
Committer: Tom White <tom@cloudera.com>
Committed: Mon Nov 3 14:00:33 2014 +0000

----------------------------------------------------------------------
 .../avro/AvroIndexedRecordConverter.java        | 26 ++++++++++++++++++--
 .../avro/TestSpecificInputOutputFormat.java     | 11 +++++++--
 2 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/a29815ab/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
index e235741..870c6f0 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
@@ -25,6 +25,7 @@ import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.specific.SpecificData;
 import parquet.Preconditions;
+import parquet.column.Dictionary;
 import parquet.io.InvalidRecordException;
 import parquet.io.api.Binary;
 import parquet.io.api.Converter;
@@ -32,6 +33,7 @@ import parquet.io.api.GroupConverter;
 import parquet.io.api.PrimitiveConverter;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
 import parquet.schema.Type;
 
 class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter {
@@ -119,7 +121,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     } else if (schema.getType().equals(Schema.Type.BYTES)) {
       return new FieldBytesConverter(parent);
     } else if (schema.getType().equals(Schema.Type.STRING)) {
-      return new FieldStringConverter(parent);
+      return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
     } else if (schema.getType().equals(Schema.Type.RECORD)) {
       return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema);
     } else if (schema.getType().equals(Schema.Type.ENUM)) {
@@ -320,9 +322,12 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
   static final class FieldStringConverter extends PrimitiveConverter {
 
     private final ParentValueContainer parent;
+    private final boolean dictionarySupport;
+    private String[] dict;
 
-    public FieldStringConverter(ParentValueContainer parent) {
+    public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) {
       this.parent = parent;
+      this.dictionarySupport = dictionarySupport;
     }
 
     @Override
@@ -330,6 +335,23 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
       parent.add(value.toStringUsingUTF8());
     }
 
+    @Override
+    public boolean hasDictionarySupport() {
+      return dictionarySupport;
+    }
+
+    @Override
+    public void setDictionary(Dictionary dictionary) {
+      dict = new String[dictionary.getMaxId() + 1];
+      for (int i = 0; i <= dictionary.getMaxId(); i++) {
+        dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
+      }
+    }
+
+    @Override
+    public void addValueFromDictionary(int dictionaryId) {
+      parent.add(dict[dictionaryId]);
+    }
   }
 
   static final class FieldEnumConverter extends PrimitiveConverter {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/a29815ab/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java b/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java
index 60ea2e4..b03a6c8 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestSpecificInputOutputFormat.java
@@ -18,6 +18,7 @@ package parquet.avro;
 import static java.lang.Thread.sleep;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
@@ -50,11 +51,11 @@ public class TestSpecificInputOutputFormat {
     Car.Builder carBuilder = Car.newBuilder()
         .setDoors(2)
         .setMake("Tesla")
-        .setModel("Model X")
+        .setModel(String.format("Model X v%d", i % 2))
         .setVin(new Vin(vin.getBytes()))
         .setYear(2014 + i)
         .setOptionalExtra(LeatherTrim.newBuilder().setColour("black").build())
-        .setRegistration("Calfornia");
+        .setRegistration("California");
     Engine.Builder engineBuilder = Engine.newBuilder()
         .setCapacity(85.0f)
         .setHasTurboCharger(false);
@@ -186,8 +187,13 @@ public class TestSpecificInputOutputFormat {
         "part-m-00000.parquet");
     final AvroParquetReader<Car> out = new AvroParquetReader<Car>(mapperOutput);
     Car car;
+    Car previousCar = null;
     int lineNumber = 0;
     while ((car = out.read()) != null) {
+      if (previousCar != null) {
+         // Testing reference equality here. The "model" field should be dictionary-encoded.
+         assertTrue(car.getModel() == previousCar.getModel());
+      }
       // Make sure that predicate push down worked as expected
       if (car.getEngine().getType() == EngineType.PETROL) {
         fail("UnboundRecordFilter failed to remove cars with PETROL engines");
@@ -199,6 +205,7 @@ public class TestSpecificInputOutputFormat {
       expectedCar.setOptionalExtra(null);
       assertEquals("line " + lineNumber, expectedCar, car);
       ++lineNumber;
+      previousCar = car;
     }
     out.close();
   }


Mime
View raw message