parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject incubator-parquet-mr git commit: PARQUET-140: Allow clients to control the GenericData instance used to read Avro records
Date Tue, 02 Dec 2014 16:19:25 GMT
Repository: incubator-parquet-mr
Updated Branches:
  refs/heads/master ad06e6114 -> b5f6a3bd8


PARQUET-140: Allow clients to control the GenericData instance used to read Avro records

Author: Josh Wills <jwills@cloudera.com>

Closes #90 from jwills/master and squashes the following commits:

044cf54 [Josh Wills] PARQUET-140: Allow clients to control the GenericData object that is
used to read Avro records


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/b5f6a3bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/b5f6a3bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/b5f6a3bd

Branch: refs/heads/master
Commit: b5f6a3bd86bfe0f186b07eb69480564d5fc854dc
Parents: ad06e61
Author: Josh Wills <jwills@cloudera.com>
Authored: Tue Dec 2 16:19:14 2014 +0000
Committer: Tom White <tom@cloudera.com>
Committed: Tue Dec 2 16:19:14 2014 +0000

----------------------------------------------------------------------
 .../java/parquet/avro/AvroDataSupplier.java     | 28 +++++++++
 .../avro/AvroIndexedRecordConverter.java        | 66 +++++++++++++-------
 .../parquet/avro/AvroParquetInputFormat.java    | 11 ++++
 .../main/java/parquet/avro/AvroReadSupport.java | 14 ++++-
 .../parquet/avro/AvroRecordMaterializer.java    |  6 +-
 .../java/parquet/avro/SpecificDataSupplier.java | 26 ++++++++
 6 files changed, 124 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/b5f6a3bd/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java b/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
new file mode 100644
index 0000000..8d9932a
--- /dev/null
+++ b/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
@@ -0,0 +1,28 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+
+/**
+ * Allows clients to control how the classes associated
+ * with specific Avro records are managed and found, e.g.,
+ * by creating an instance of {@code GenericData} that
+ * uses a particular {@code ClassLoader}.
+ */
+public interface AvroDataSupplier {
+  GenericData get();
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/b5f6a3bd/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
index 870c6f0..3982036 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroIndexedRecordConverter.java
@@ -49,18 +49,29 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
   private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field,
Object>();
 
   public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) {
-    this(null, parquetSchema, avroSchema);
+    this(parquetSchema, avroSchema, SpecificData.get());
+  }
+
+  public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema,
+      GenericData baseModel) {
+    this(null, parquetSchema, avroSchema, baseModel);
   }
 
   public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
       parquetSchema, Schema avroSchema) {
+    this(parent, parquetSchema, avroSchema, SpecificData.get());
+  }
+
+  public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
+      parquetSchema, Schema avroSchema, GenericData baseModel) {
     this.parent = parent;
     this.avroSchema = avroSchema;
     int schemaSize = parquetSchema.getFieldCount();
     this.converters = new Converter[schemaSize];
-    this.specificClass = SpecificData.get().getClass(avroSchema);
+    this.specificClass = baseModel instanceof SpecificData ?
+        ((SpecificData) baseModel).getClass(avroSchema) : null;
 
-    model = this.specificClass == null ? GenericData.get() : SpecificData.get();
+    this.model = this.specificClass == null ? GenericData.get() : baseModel;
 
     Map<String, Integer> avroFieldIndexes = new HashMap<String, Integer>();
     int avroFieldIndex = 0;
@@ -72,7 +83,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
       Schema.Field avroField = getAvroField(parquetField.getName());
       Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema());
       final int finalAvroIndex = avroFieldIndexes.remove(avroField.name());
-      converters[parquetFieldIndex++] = newConverter(nonNullSchema, parquetField, new ParentValueContainer()
{
+      converters[parquetFieldIndex++] = newConverter(nonNullSchema, parquetField, model,
new ParentValueContainer() {
         @Override
         void add(Object value) {
           AvroIndexedRecordConverter.this.set(finalAvroIndex, value);
@@ -107,7 +118,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
   }
 
   private static Converter newConverter(Schema schema, Type type,
-      ParentValueContainer parent) {
+      GenericData model, ParentValueContainer parent) {
     if (schema.getType().equals(Schema.Type.BOOLEAN)) {
       return new FieldBooleanConverter(parent);
     } else if (schema.getType().equals(Schema.Type.INT)) {
@@ -123,17 +134,17 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     } else if (schema.getType().equals(Schema.Type.STRING)) {
       return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
     } else if (schema.getType().equals(Schema.Type.RECORD)) {
-      return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema);
+      return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema, model);
     } else if (schema.getType().equals(Schema.Type.ENUM)) {
-      return new FieldEnumConverter(parent,schema);
+      return new FieldEnumConverter(parent, schema, model);
     } else if (schema.getType().equals(Schema.Type.ARRAY)) {
-      return new AvroArrayConverter(parent, type, schema);
+      return new AvroArrayConverter(parent, type, schema, model);
     } else if (schema.getType().equals(Schema.Type.MAP)) {
-      return new MapConverter(parent, type, schema);
+      return new MapConverter(parent, type, schema, model);
     } else if (schema.getType().equals(Schema.Type.UNION)) {
-      return new AvroUnionConverter(parent, type, schema);
+      return new AvroUnionConverter(parent, type, schema, model);
     } else if (schema.getType().equals(Schema.Type.FIXED)) {
-      return new FieldFixedConverter(parent, schema);
+      return new FieldFixedConverter(parent, schema, model);
     }
     throw new UnsupportedOperationException(String.format("Cannot convert Avro type: %s"
+
         " (Parquet type: %s) ", schema, type));
@@ -153,7 +164,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     // Should do the right thing whether it is generic or specific
     this.currentRecord = (T) ((this.specificClass == null) ?
             new GenericData.Record(avroSchema) :
-            SpecificData.newInstance(specificClass, avroSchema));
+            ((SpecificData) model).newInstance(specificClass, avroSchema));
   }
 
   @Override
@@ -359,9 +370,12 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     private final ParentValueContainer parent;
     private final Class<? extends Enum> enumClass;
 
-    public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema) {
+    public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema,
+        GenericData model) {
       this.parent = parent;
-      this.enumClass = SpecificData.get().getClass(enumSchema);
+      this.enumClass = model instanceof SpecificData ?
+          ((SpecificData) model).getClass(enumSchema) :
+          SpecificData.get().getClass(enumSchema);
     }
 
     @Override
@@ -381,10 +395,13 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     private final Class<? extends GenericData.Fixed> fixedClass;
     private final Constructor fixedClassCtor;
 
-    public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema) {
+    public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema,
+        GenericData model) {
       this.parent = parent;
       this.avroSchema = avroSchema;
-      this.fixedClass = SpecificData.get().getClass(avroSchema);
+      this.fixedClass = model instanceof SpecificData ?
+          ((SpecificData) model).getClass(avroSchema) :
+          SpecificData.get().getClass(avroSchema);
       if (fixedClass != null) {
         try {
           this.fixedClassCtor = 
@@ -424,12 +441,12 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     private GenericArray<T> array;
 
     public AvroArrayConverter(ParentValueContainer parent, Type parquetSchema,
-        Schema avroSchema) {
+        Schema avroSchema, GenericData model) {
       this.parent = parent;
       this.avroSchema = avroSchema;
       Type elementType = parquetSchema.asGroupType().getType(0);
       Schema elementSchema = avroSchema.getElementType();
-      converter = newConverter(elementSchema, elementType, new ParentValueContainer() {
+      converter = newConverter(elementSchema, elementType, model, new ParentValueContainer()
{
         @Override
         @SuppressWarnings("unchecked")
         void add(Object value) {
@@ -461,7 +478,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     private Object memberValue = null;
 
     public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema,
-                              Schema avroSchema) {
+                              Schema avroSchema, GenericData model) {
       this.parent = parent;
       GroupType parquetGroup = parquetSchema.asGroupType();
       this.memberConverters = new Converter[ parquetGroup.getFieldCount()];
@@ -471,7 +488,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
         Schema memberSchema = avroSchema.getTypes().get(index);
         if (!memberSchema.getType().equals(Schema.Type.NULL)) {
           Type memberType = parquetGroup.getType(parquetIndex);
-          memberConverters[parquetIndex] = newConverter(memberSchema, memberType, new ParentValueContainer()
{
+          memberConverters[parquetIndex] = newConverter(memberSchema, memberType, model,
new ParentValueContainer() {
             @Override
             void add(Object value) {
               Preconditions.checkArgument(memberValue==null, "Union is resolving to more
than one type");
@@ -506,9 +523,9 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
     private Map<String, V> map;
 
     public MapConverter(ParentValueContainer parent, Type parquetSchema,
-        Schema avroSchema) {
+        Schema avroSchema, GenericData model) {
       this.parent = parent;
-      this.keyValueConverter = new MapKeyValueConverter(parquetSchema, avroSchema);
+      this.keyValueConverter = new MapKeyValueConverter(parquetSchema, avroSchema, model);
     }
 
     @Override
@@ -533,7 +550,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
       private final Converter keyConverter;
       private final Converter valueConverter;
 
-      public MapKeyValueConverter(Type parquetSchema, Schema avroSchema) {
+      public MapKeyValueConverter(Type parquetSchema, Schema avroSchema,
+          GenericData model) {
         keyConverter = new PrimitiveConverter() {
           @Override
           final public void addBinary(Binary value) {
@@ -543,7 +561,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends
GroupConverter
 
         Type valueType = parquetSchema.asGroupType().getType(0).asGroupType().getType(1);
         Schema valueSchema = avroSchema.getValueType();
-        valueConverter = newConverter(valueSchema, valueType, new ParentValueContainer()
{
+        valueConverter = newConverter(valueSchema, valueType, model, new ParentValueContainer()
{
           @Override
           @SuppressWarnings("unchecked")
           void add(Object value) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/b5f6a3bd/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java
index 232e5c8..2e15500 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetInputFormat.java
@@ -67,4 +67,15 @@ public class AvroParquetInputFormat<T> extends ParquetInputFormat<T>
{
     AvroReadSupport.setAvroReadSchema(ContextUtil.getConfiguration(job), avroReadSchema);
   }
 
+  /**
+   * Uses an instance of the specified {@link AvroDataSupplier} class to control how the
+   * {@link org.apache.avro.specific.SpecificData} instance that is used to find
+   * Avro specific records is created.
+   * @param job
+   * @param supplierClass
+   */
+  public static void setAvroDataSupplier(Job job,
+      Class<? extends AvroDataSupplier> supplierClass) {
+    AvroReadSupport.setAvroDataSupplier(ContextUtil.getConfiguration(job), supplierClass);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/b5f6a3bd/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
index 767fa40..2d53a30 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroReadSupport.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
 import parquet.hadoop.api.ReadSupport;
 import parquet.io.api.RecordMaterializer;
 import parquet.schema.MessageType;
@@ -37,6 +38,8 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T>
{
   static final String AVRO_SCHEMA_METADATA_KEY = "avro.schema";
   private static final String AVRO_READ_SCHEMA_METADATA_KEY = "avro.read.schema";
 
+  public static String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier";
+
   /**
    * @see parquet.avro.AvroParquetInputFormat#setRequestedProjection(org.apache.hadoop.mapreduce.Job,
org.apache.avro.Schema)
    */
@@ -51,6 +54,11 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T>
{
     configuration.set(AVRO_READ_SCHEMA, avroReadSchema.toString());
   }
 
+  public static void setAvroDataSupplier(Configuration configuration,
+      Class<? extends AvroDataSupplier> clazz) {
+    configuration.set(AVRO_DATA_SUPPLIER, clazz.toString());
+  }
+
   @Override
   public ReadContext init(Configuration configuration, Map<String, String> keyValueMetaData,
MessageType fileSchema) {
     MessageType schema = fileSchema;
@@ -84,6 +92,10 @@ public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T>
{
       // default to converting the Parquet schema into an Avro schema
       avroSchema = new AvroSchemaConverter().convert(parquetSchema);
     }
-    return new AvroRecordMaterializer<T>(parquetSchema, avroSchema);
+    Class<? extends AvroDataSupplier> suppClass = configuration.getClass(AVRO_DATA_SUPPLIER,
+        SpecificDataSupplier.class,
+        AvroDataSupplier.class);
+    AvroDataSupplier supplier =ReflectionUtils.newInstance(suppClass, configuration);
+    return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, supplier.get());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/b5f6a3bd/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java
index 03311d5..7623e18 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroRecordMaterializer.java
@@ -16,6 +16,7 @@
 package parquet.avro;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.IndexedRecord;
 import parquet.io.api.GroupConverter;
 import parquet.io.api.RecordMaterializer;
@@ -25,8 +26,9 @@ class AvroRecordMaterializer<T extends IndexedRecord> extends RecordMaterializer
 
   private AvroIndexedRecordConverter<T> root;
 
-  public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema) {
-    this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema);
+  public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema,
+      GenericData baseModel) {
+    this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/b5f6a3bd/parquet-avro/src/main/java/parquet/avro/SpecificDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/SpecificDataSupplier.java b/parquet-avro/src/main/java/parquet/avro/SpecificDataSupplier.java
new file mode 100644
index 0000000..8fb6dac
--- /dev/null
+++ b/parquet-avro/src/main/java/parquet/avro/SpecificDataSupplier.java
@@ -0,0 +1,26 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+class SpecificDataSupplier implements AvroDataSupplier {
+  @Override
+  public GenericData get() {
+    return SpecificData.get();
+  }
+}


Mime
View raw message