hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sze...@apache.org
Subject svn commit: r1660814 - in /hive/trunk/serde/src: java/org/apache/hadoop/hive/serde2/avro/ test/org/apache/hadoop/hive/serde2/avro/
Date Thu, 19 Feb 2015 08:56:05 GMT
Author: szehon
Date: Thu Feb 19 08:56:05 2015
New Revision: 1660814

URL: http://svn.apache.org/r1660814
Log:
HIVE-7653 : Hive AvroSerDe does not support circular references in Schema (Sachin Goyal via
Szehon)

Modified:
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
    hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java
    hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java?rev=1660814&r1=1660813&r2=1660814&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java Thu
Feb 19 08:56:05 2015
@@ -342,7 +342,8 @@ class AvroDeserializer {
         currentFileSchema = fileSchema;
       }
     }
-    return worker(datum, currentFileSchema, schema, SchemaToTypeInfo.generateTypeInfo(schema));
+    return worker(datum, currentFileSchema, schema,
+      SchemaToTypeInfo.generateTypeInfo(schema, null));
 
   }
 

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java?rev=1660814&r1=1660813&r2=1660814&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java Thu Feb
19 08:56:05 2015
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
@@ -152,10 +153,12 @@ class AvroSerializer {
   final InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>> enums
       = new InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>>()
{
           @Override
-          protected InstanceCache<Object, GenericEnumSymbol> makeInstance(final Schema
schema) {
+          protected InstanceCache<Object, GenericEnumSymbol> makeInstance(final Schema
schema,
+                     Set<Schema> seenSchemas) {
             return new InstanceCache<Object, GenericEnumSymbol>() {
               @Override
-              protected GenericEnumSymbol makeInstance(Object seed) {
+              protected GenericEnumSymbol makeInstance(Object seed,
+                             Set<Object> seenSchemas) {
                 return new GenericData.EnumSymbol(schema, seed.toString());
               }
             };

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java?rev=1660814&r1=1660813&r2=1660814&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java Thu Feb
19 08:56:05 2015
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import java.util.HashMap;
+import java.util.Set;
 
 /**
  * Cache for objects whose creation only depends on some other set of objects
@@ -41,6 +42,15 @@ public abstract class InstanceCache<Seed
    * SeedObject
    */
   public Instance retrieve(SeedObject hv) throws AvroSerdeException {
+    return retrieve(hv, null);
+  }
+
+  /**
+   * Retrieve (or create if it doesn't exist) the correct Instance for this
+   * SeedObject using 'seenSchemas' to resolve circular references
+   */
+  public Instance retrieve(SeedObject hv,
+      Set<SeedObject> seenSchemas) throws AvroSerdeException {
     if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString());
 
     if(cache.containsKey(hv.hashCode())) {
@@ -50,10 +60,11 @@ public abstract class InstanceCache<Seed
 
     if(LOG.isDebugEnabled()) LOG.debug("Creating new instance and storing in cache");
 
-    Instance instance = makeInstance(hv);
+    Instance instance = makeInstance(hv, seenSchemas);
     cache.put(hv.hashCode(), instance);
     return instance;
   }
 
-  protected abstract Instance makeInstance(SeedObject hv) throws AvroSerdeException;
+  protected abstract Instance makeInstance(SeedObject hv,
+      Set<SeedObject> seenSchemas) throws AvroSerdeException;
 }

Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java?rev=1660814&r1=1660813&r2=1660814&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java (original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java Thu
Feb 19 08:56:05 2015
@@ -30,8 +30,10 @@ import static org.apache.avro.Schema.Typ
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Hashtable;
+import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
@@ -82,12 +84,28 @@ class SchemaToTypeInfo {
    * @throws AvroSerdeException for problems during conversion.
    */
   public static List<TypeInfo> generateColumnTypes(Schema schema) throws AvroSerdeException
{
+    return generateColumnTypes (schema, null);
+  }
+
+  /**
+   * Generate a list of of TypeInfos from an Avro schema.  This method is
+   * currently public due to some weirdness in deserializing unions, but
+   * will be made private once that is resolved.
+   * @param schema Schema to generate field types for
+   * @param seenSchemas stores schemas processed in the parsing done so far,
+   *         helping to resolve circular references in the schema
+   * @return List of TypeInfos, each element of which is a TypeInfo derived
+   *         from the schema.
+   * @throws AvroSerdeException for problems during conversion.
+   */
+  public static List<TypeInfo> generateColumnTypes(Schema schema,
+      Set<Schema> seenSchemas) throws AvroSerdeException {
     List<Schema.Field> fields = schema.getFields();
 
     List<TypeInfo> types = new ArrayList<TypeInfo>(fields.size());
 
     for (Schema.Field field : fields) {
-      types.add(generateTypeInfo(field.schema()));
+      types.add(generateTypeInfo(field.schema(), seenSchemas));
     }
 
     return types;
@@ -95,17 +113,22 @@ class SchemaToTypeInfo {
 
   static InstanceCache<Schema, TypeInfo> typeInfoCache = new InstanceCache<Schema,
TypeInfo>() {
                                   @Override
-                                  protected TypeInfo makeInstance(Schema s) throws AvroSerdeException
{
-                                    return generateTypeInfoWorker(s);
+                                  protected TypeInfo makeInstance(Schema s,
+                                      Set<Schema> seenSchemas)
+                                      throws AvroSerdeException {
+                                    return generateTypeInfoWorker(s, seenSchemas);
                                   }
                                 };
   /**
    * Convert an Avro Schema into an equivalent Hive TypeInfo.
    * @param schema to record. Must be of record type.
+   * @param seenSchemas stores schemas processed in the parsing done so far,
+   *         helping to resolve circular references in the schema
    * @return TypeInfo matching the Avro schema
    * @throws AvroSerdeException for any problems during conversion.
    */
-  public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException {
+  public static TypeInfo generateTypeInfo(Schema schema,
+      Set<Schema> seenSchemas) throws AvroSerdeException {
     // For bytes type, it can be mapped to decimal.
     Schema.Type type = schema.getType();
     if (type == BYTES && AvroSerDe.DECIMAL_TYPE_NAME
@@ -160,14 +183,16 @@ class SchemaToTypeInfo {
       return TypeInfoFactory.timestampTypeInfo;
     }
 
-    return typeInfoCache.retrieve(schema);
+    return typeInfoCache.retrieve(schema, seenSchemas);
   }
 
-  private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeException
{
+  private static TypeInfo generateTypeInfoWorker(Schema schema,
+      Set<Schema> seenSchemas) throws AvroSerdeException {
     // Avro requires NULLable types to be defined as unions of some type T
     // and NULL.  This is annoying and we're going to hide it from the user.
     if(AvroSerdeUtils.isNullableType(schema)) {
-      return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema));
+      return generateTypeInfo(
+        AvroSerdeUtils.getOtherTypeFromNullableType(schema), seenSchemas);
     }
 
     Schema.Type type = schema.getType();
@@ -176,25 +201,33 @@ class SchemaToTypeInfo {
     }
 
     switch(type) {
-      case RECORD: return generateRecordTypeInfo(schema);
-      case MAP:    return generateMapTypeInfo(schema);
-      case ARRAY:  return generateArrayTypeInfo(schema);
-      case UNION:  return generateUnionTypeInfo(schema);
+      case RECORD: return generateRecordTypeInfo(schema, seenSchemas);
+      case MAP:    return generateMapTypeInfo(schema, seenSchemas);
+      case ARRAY:  return generateArrayTypeInfo(schema, seenSchemas);
+      case UNION:  return generateUnionTypeInfo(schema, seenSchemas);
       case ENUM:   return generateEnumTypeInfo(schema);
       default:     throw new AvroSerdeException("Do not yet support: " + schema);
     }
   }
 
-  private static TypeInfo generateRecordTypeInfo(Schema schema) throws AvroSerdeException
{
+  private static TypeInfo generateRecordTypeInfo(Schema schema,
+      Set<Schema> seenSchemas) throws AvroSerdeException {
     assert schema.getType().equals(Schema.Type.RECORD);
 
+    if (seenSchemas == null) {
+        seenSchemas = Collections.newSetFromMap(new IdentityHashMap<Schema, Boolean>());
+    } else if (seenSchemas.contains(schema)) {
+        return primitiveTypeToTypeInfo.get(Schema.Type.NULL);
+    }
+    seenSchemas.add(schema);
+
     List<Schema.Field> fields = schema.getFields();
     List<String> fieldNames = new ArrayList<String>(fields.size());
     List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(fields.size());
 
     for(int i = 0; i < fields.size(); i++) {
       fieldNames.add(i, fields.get(i).name());
-      typeInfos.add(i, generateTypeInfo(fields.get(i).schema()));
+      typeInfos.add(i, generateTypeInfo(fields.get(i).schema(), seenSchemas));
     }
 
     return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos);
@@ -204,23 +237,26 @@ class SchemaToTypeInfo {
    * Generate a TypeInfo for an Avro Map.  This is made slightly simpler in that
    * Avro only allows maps with strings for keys.
    */
-  private static TypeInfo generateMapTypeInfo(Schema schema) throws AvroSerdeException {
+  private static TypeInfo generateMapTypeInfo(Schema schema,
+      Set<Schema> seenSchemas) throws AvroSerdeException {
     assert schema.getType().equals(Schema.Type.MAP);
     Schema valueType = schema.getValueType();
-    TypeInfo ti = generateTypeInfo(valueType);
+    TypeInfo ti = generateTypeInfo(valueType, seenSchemas);
 
     return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"),
ti);
   }
 
-  private static TypeInfo generateArrayTypeInfo(Schema schema) throws AvroSerdeException
{
+  private static TypeInfo generateArrayTypeInfo(Schema schema,
+      Set<Schema> seenSchemas) throws AvroSerdeException {
     assert schema.getType().equals(Schema.Type.ARRAY);
     Schema itemsType = schema.getElementType();
-    TypeInfo itemsTypeInfo = generateTypeInfo(itemsType);
+    TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas);
 
     return TypeInfoFactory.getListTypeInfo(itemsTypeInfo);
   }
 
-  private static TypeInfo generateUnionTypeInfo(Schema schema) throws AvroSerdeException
{
+  private static TypeInfo generateUnionTypeInfo(Schema schema,
+      Set<Schema> seenSchemas) throws AvroSerdeException {
     assert schema.getType().equals(Schema.Type.UNION);
     List<Schema> types = schema.getTypes();
 
@@ -228,7 +264,7 @@ class SchemaToTypeInfo {
     List<TypeInfo> typeInfos = new ArrayList<TypeInfo>(types.size());
 
     for(Schema type : types) {
-      typeInfos.add(generateTypeInfo(type));
+      typeInfos.add(generateTypeInfo(type, seenSchemas));
     }
 
     return TypeInfoFactory.getUnionTypeInfo(typeInfos);

Modified: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java?rev=1660814&r1=1660813&r2=1660814&view=diff
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java (original)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java Thu
Feb 19 08:56:05 2015
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.serde2.avro;
 
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -486,4 +487,47 @@ public class TestAvroSerializer {
     assertArrayEquals(fixed.bytes(), ((GenericData.Fixed) r.get("fixed1")).bytes());
   }
 
+  @Test
+  public void canSerializeCyclesInSchema() throws SerDeException, IOException {
+    // Create parent-child avro-record and avro-schema
+    AvroCycleParent parent = new AvroCycleParent();
+    AvroCycleChild child = new AvroCycleChild();
+    parent.setChild (child);
+    Schema parentS = ReflectData.AllowNull.get().getSchema(AvroCycleParent.class);
+    GenericData.Record parentRec = new GenericData.Record(parentS);
+    Schema childS = ReflectData.AllowNull.get().getSchema(AvroCycleChild.class);
+    GenericData.Record childRec  = new GenericData.Record(childS);
+    parentRec.put("child", childRec);
+
+    // Initialize Avro SerDe
+    AvroSerializer as = new AvroSerializer();
+    AvroDeserializer ad = new AvroDeserializer();
+    AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(parentS);
+    ObjectInspector oi = aoig.getObjectInspector();
+    List<String> columnNames = aoig.getColumnNames();
+    List<TypeInfo> columnTypes = aoig.getColumnTypes();
+
+    // Check serialization and deserialization
+    AvroGenericRecordWritable agrw = Utils.serializeAndDeserializeRecord(parentRec);
+    Object obj = ad.deserialize(columnNames, columnTypes, agrw, parentS);
+
+    Writable result = as.serialize(obj, oi, columnNames, columnTypes, parentS);
+    assertTrue(result instanceof AvroGenericRecordWritable);
+    GenericRecord r2 = ((AvroGenericRecordWritable) result).getRecord();
+    assertEquals(parentS, r2.getSchema());
+  }
+
+  private static class AvroCycleParent {
+    AvroCycleChild child;
+    public AvroCycleChild getChild () {return child;}
+    public void setChild (AvroCycleChild child) {this.child = child;}
+  }
+
+  private static class AvroCycleChild {
+    AvroCycleParent parent;
+    AvroCycleChild next;
+    Map <String, AvroCycleParent> map;
+    public AvroCycleParent getParent () {return parent;}
+    public void setParent (AvroCycleParent parent) {this.parent = parent;}
+  }
 }

Modified: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java
URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java?rev=1660814&r1=1660813&r2=1660814&view=diff
==============================================================================
--- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java (original)
+++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java Thu
Feb 19 08:56:05 2015
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.serde2.avro;
 
+import java.util.Set;
 import org.junit.Test;
 
 import static org.junit.Assert.assertSame;
@@ -41,18 +42,19 @@ public class TestInstanceCache {
   public void instanceCachesOnlyCreateOneInstance() throws AvroSerdeException {
     InstanceCache<Foo, Wrapper<Foo>> ic = new InstanceCache<Foo, Wrapper<Foo>>()
{
                                            @Override
-                                           protected Wrapper makeInstance(Foo hv) {
+                                           protected Wrapper makeInstance(Foo hv,
+                                               Set<Foo> seenSchemas) {
                                              return new Wrapper(hv);
                                            }
                                           };
     Foo f1 = new Foo();
 
-    Wrapper fc = ic.retrieve(f1);
+    Wrapper fc = ic.retrieve(f1, null);
     assertSame(f1, fc.wrapped); // Our original foo should be in the wrapper
 
     Foo f2 = new Foo(); // Different instance, same value
 
-    Wrapper fc2 = ic.retrieve(f2);
+    Wrapper fc2 = ic.retrieve(f2, null);
     assertSame(fc2,fc); // Since equiv f, should get back first container
     assertSame(fc2.wrapped, f1);
   }
@@ -60,19 +62,20 @@ public class TestInstanceCache {
   @Test
   public void instanceCacheReturnsCorrectInstances() throws AvroSerdeException {
     InstanceCache<String, Wrapper<String>> ic = new InstanceCache<String,
Wrapper<String>>() {
-                                                    @Override
-                                                    protected Wrapper<String> makeInstance(String
hv) {
-                                                      return new Wrapper<String>(hv);
-                                                    }
-                                                  };
+                                   @Override
+                                   protected Wrapper<String> makeInstance(
+                                       String hv, Set<String> seenSchemas) {
+                                     return new Wrapper<String>(hv);
+                                   }
+                                 };
 
-    Wrapper<String> one = ic.retrieve("one");
-    Wrapper<String> two = ic.retrieve("two");
+    Wrapper<String> one = ic.retrieve("one", null);
+    Wrapper<String> two = ic.retrieve("two", null);
 
-    Wrapper<String> anotherOne = ic.retrieve("one");
+    Wrapper<String> anotherOne = ic.retrieve("one", null);
     assertSame(one, anotherOne);
 
-    Wrapper<String> anotherTwo = ic.retrieve("two");
+    Wrapper<String> anotherTwo = ic.retrieve("two", null);
     assertSame(two, anotherTwo);
   }
 }



Mime
View raw message