Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 45AA5100DF for ; Thu, 19 Feb 2015 08:56:06 +0000 (UTC) Received: (qmail 82543 invoked by uid 500); 19 Feb 2015 08:56:06 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 82499 invoked by uid 500); 19 Feb 2015 08:56:06 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 82488 invoked by uid 99); 19 Feb 2015 08:56:06 -0000 Received: from eris.apache.org (HELO hades.apache.org) (140.211.11.105) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2015 08:56:06 +0000 Received: from hades.apache.org (localhost [127.0.0.1]) by hades.apache.org (ASF Mail Server at hades.apache.org) with ESMTP id E74D3AC0051 for ; Thu, 19 Feb 2015 08:56:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1660814 - in /hive/trunk/serde/src: java/org/apache/hadoop/hive/serde2/avro/ test/org/apache/hadoop/hive/serde2/avro/ Date: Thu, 19 Feb 2015 08:56:05 -0000 To: commits@hive.apache.org From: szehon@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20150219085605.E74D3AC0051@hades.apache.org> Author: szehon Date: Thu Feb 19 08:56:05 2015 New Revision: 1660814 URL: http://svn.apache.org/r1660814 Log: HIVE-7653 : Hive AvroSerDe does not support circular references in Schema (Sachin Goyal via Szehon) Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java?rev=1660814&r1=1660813&r2=1660814&view=diff ============================================================================== --- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (original) +++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java Thu Feb 19 08:56:05 2015 @@ -342,7 +342,8 @@ class AvroDeserializer { currentFileSchema = fileSchema; } } - return worker(datum, currentFileSchema, schema, SchemaToTypeInfo.generateTypeInfo(schema)); + return worker(datum, currentFileSchema, schema, + SchemaToTypeInfo.generateTypeInfo(schema, null)); } Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java?rev=1660814&r1=1660813&r2=1660814&view=diff ============================================================================== --- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java (original) +++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerializer.java Thu Feb 19 08:56:05 2015 @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -152,10 +153,12 @@ class AvroSerializer { final InstanceCache> enums = new InstanceCache>() { @Override - protected InstanceCache makeInstance(final Schema schema) { + protected InstanceCache makeInstance(final Schema schema, + Set seenSchemas) { return new InstanceCache() { @Override - protected GenericEnumSymbol makeInstance(Object seed) { + protected GenericEnumSymbol makeInstance(Object seed, + Set seenSchemas) { return new GenericData.EnumSymbol(schema, seed.toString()); } }; Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java?rev=1660814&r1=1660813&r2=1660814&view=diff ============================================================================== --- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java (original) +++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/InstanceCache.java Thu Feb 19 08:56:05 2015 @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.HashMap; +import java.util.Set; /** * Cache for objects whose creation only depends on some other set of objects @@ -41,6 +42,15 @@ public abstract class InstanceCache seenSchemas) throws AvroSerdeException { if(LOG.isDebugEnabled()) LOG.debug("Checking for hv: " + hv.toString()); if(cache.containsKey(hv.hashCode())) { @@ -50,10 +60,11 @@ public abstract class InstanceCache seenSchemas) throws AvroSerdeException; } Modified: hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java?rev=1660814&r1=1660813&r2=1660814&view=diff ============================================================================== --- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java (original) +++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/SchemaToTypeInfo.java Thu Feb 19 08:56:05 2015 @@ -30,8 +30,10 @@ import static org.apache.avro.Schema.Typ import java.util.ArrayList; import java.util.Collections; import java.util.Hashtable; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.avro.Schema; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; @@ -82,12 +84,28 @@ class SchemaToTypeInfo { * @throws AvroSerdeException for problems during conversion. */ public static List generateColumnTypes(Schema schema) throws AvroSerdeException { + return generateColumnTypes (schema, null); + } + + /** + * Generate a list of of TypeInfos from an Avro schema. This method is + * currently public due to some weirdness in deserializing unions, but + * will be made private once that is resolved. + * @param schema Schema to generate field types for + * @param seenSchemas stores schemas processed in the parsing done so far, + * helping to resolve circular references in the schema + * @return List of TypeInfos, each element of which is a TypeInfo derived + * from the schema. + * @throws AvroSerdeException for problems during conversion. + */ + public static List generateColumnTypes(Schema schema, + Set seenSchemas) throws AvroSerdeException { List fields = schema.getFields(); List types = new ArrayList(fields.size()); for (Schema.Field field : fields) { - types.add(generateTypeInfo(field.schema())); + types.add(generateTypeInfo(field.schema(), seenSchemas)); } return types; @@ -95,17 +113,22 @@ class SchemaToTypeInfo { static InstanceCache typeInfoCache = new InstanceCache() { @Override - protected TypeInfo makeInstance(Schema s) throws AvroSerdeException { - return generateTypeInfoWorker(s); + protected TypeInfo makeInstance(Schema s, + Set seenSchemas) + throws AvroSerdeException { + return generateTypeInfoWorker(s, seenSchemas); } }; /** * Convert an Avro Schema into an equivalent Hive TypeInfo. * @param schema to record. Must be of record type. + * @param seenSchemas stores schemas processed in the parsing done so far, + * helping to resolve circular references in the schema * @return TypeInfo matching the Avro schema * @throws AvroSerdeException for any problems during conversion. */ - public static TypeInfo generateTypeInfo(Schema schema) throws AvroSerdeException { + public static TypeInfo generateTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { // For bytes type, it can be mapped to decimal. Schema.Type type = schema.getType(); if (type == BYTES && AvroSerDe.DECIMAL_TYPE_NAME @@ -160,14 +183,16 @@ class SchemaToTypeInfo { return TypeInfoFactory.timestampTypeInfo; } - return typeInfoCache.retrieve(schema); + return typeInfoCache.retrieve(schema, seenSchemas); } - private static TypeInfo generateTypeInfoWorker(Schema schema) throws AvroSerdeException { + private static TypeInfo generateTypeInfoWorker(Schema schema, + Set seenSchemas) throws AvroSerdeException { // Avro requires NULLable types to be defined as unions of some type T // and NULL. This is annoying and we're going to hide it from the user. if(AvroSerdeUtils.isNullableType(schema)) { - return generateTypeInfo(AvroSerdeUtils.getOtherTypeFromNullableType(schema)); + return generateTypeInfo( + AvroSerdeUtils.getOtherTypeFromNullableType(schema), seenSchemas); } Schema.Type type = schema.getType(); @@ -176,25 +201,33 @@ class SchemaToTypeInfo { } switch(type) { - case RECORD: return generateRecordTypeInfo(schema); - case MAP: return generateMapTypeInfo(schema); - case ARRAY: return generateArrayTypeInfo(schema); - case UNION: return generateUnionTypeInfo(schema); + case RECORD: return generateRecordTypeInfo(schema, seenSchemas); + case MAP: return generateMapTypeInfo(schema, seenSchemas); + case ARRAY: return generateArrayTypeInfo(schema, seenSchemas); + case UNION: return generateUnionTypeInfo(schema, seenSchemas); case ENUM: return generateEnumTypeInfo(schema); default: throw new AvroSerdeException("Do not yet support: " + schema); } } - private static TypeInfo generateRecordTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateRecordTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.RECORD); + if (seenSchemas == null) { + seenSchemas = Collections.newSetFromMap(new IdentityHashMap()); + } else if (seenSchemas.contains(schema)) { + return primitiveTypeToTypeInfo.get(Schema.Type.NULL); + } + seenSchemas.add(schema); + List fields = schema.getFields(); List fieldNames = new ArrayList(fields.size()); List typeInfos = new ArrayList(fields.size()); for(int i = 0; i < fields.size(); i++) { fieldNames.add(i, fields.get(i).name()); - typeInfos.add(i, generateTypeInfo(fields.get(i).schema())); + typeInfos.add(i, generateTypeInfo(fields.get(i).schema(), seenSchemas)); } return TypeInfoFactory.getStructTypeInfo(fieldNames, typeInfos); @@ -204,23 +237,26 @@ class SchemaToTypeInfo { * Generate a TypeInfo for an Avro Map. This is made slightly simpler in that * Avro only allows maps with strings for keys. */ - private static TypeInfo generateMapTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateMapTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.MAP); Schema valueType = schema.getValueType(); - TypeInfo ti = generateTypeInfo(valueType); + TypeInfo ti = generateTypeInfo(valueType, seenSchemas); return TypeInfoFactory.getMapTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"), ti); } - private static TypeInfo generateArrayTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateArrayTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.ARRAY); Schema itemsType = schema.getElementType(); - TypeInfo itemsTypeInfo = generateTypeInfo(itemsType); + TypeInfo itemsTypeInfo = generateTypeInfo(itemsType, seenSchemas); return TypeInfoFactory.getListTypeInfo(itemsTypeInfo); } - private static TypeInfo generateUnionTypeInfo(Schema schema) throws AvroSerdeException { + private static TypeInfo generateUnionTypeInfo(Schema schema, + Set seenSchemas) throws AvroSerdeException { assert schema.getType().equals(Schema.Type.UNION); List types = schema.getTypes(); @@ -228,7 +264,7 @@ class SchemaToTypeInfo { List typeInfos = new ArrayList(types.size()); for(Schema type : types) { - typeInfos.add(generateTypeInfo(type)); + typeInfos.add(generateTypeInfo(type, seenSchemas)); } return TypeInfoFactory.getUnionTypeInfo(typeInfos); Modified: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java?rev=1660814&r1=1660813&r2=1660814&view=diff ============================================================================== --- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java (original) +++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroSerializer.java Thu Feb 19 08:56:05 2015 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.serde2.avro; +import org.apache.avro.reflect.ReflectData; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -486,4 +487,47 @@ public class TestAvroSerializer { assertArrayEquals(fixed.bytes(), ((GenericData.Fixed) r.get("fixed1")).bytes()); } + @Test + public void canSerializeCyclesInSchema() throws SerDeException, IOException { + // Create parent-child avro-record and avro-schema + AvroCycleParent parent = new AvroCycleParent(); + AvroCycleChild child = new AvroCycleChild(); + parent.setChild (child); + Schema parentS = ReflectData.AllowNull.get().getSchema(AvroCycleParent.class); + GenericData.Record parentRec = new GenericData.Record(parentS); + Schema childS = ReflectData.AllowNull.get().getSchema(AvroCycleChild.class); + GenericData.Record childRec = new GenericData.Record(childS); + parentRec.put("child", childRec); + + // Initialize Avro SerDe + AvroSerializer as = new AvroSerializer(); + AvroDeserializer ad = new AvroDeserializer(); + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(parentS); + ObjectInspector oi = aoig.getObjectInspector(); + List columnNames = aoig.getColumnNames(); + List columnTypes = aoig.getColumnTypes(); + + // Check serialization and deserialization + AvroGenericRecordWritable agrw = Utils.serializeAndDeserializeRecord(parentRec); + Object obj = ad.deserialize(columnNames, columnTypes, agrw, parentS); + + Writable result = as.serialize(obj, oi, columnNames, columnTypes, parentS); + assertTrue(result instanceof AvroGenericRecordWritable); + GenericRecord r2 = ((AvroGenericRecordWritable) result).getRecord(); + assertEquals(parentS, r2.getSchema()); + } + + private static class AvroCycleParent { + AvroCycleChild child; + public AvroCycleChild getChild () {return child;} + public void setChild (AvroCycleChild child) {this.child = child;} + } + + private static class AvroCycleChild { + AvroCycleParent parent; + AvroCycleChild next; + Map map; + public AvroCycleParent getParent () {return parent;} + public void setParent (AvroCycleParent parent) {this.parent = parent;} + } } Modified: hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java URL: http://svn.apache.org/viewvc/hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java?rev=1660814&r1=1660813&r2=1660814&view=diff ============================================================================== --- hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java (original) +++ hive/trunk/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestInstanceCache.java Thu Feb 19 08:56:05 2015 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.util.Set; import org.junit.Test; import static org.junit.Assert.assertSame; @@ -41,18 +42,19 @@ public class TestInstanceCache { public void instanceCachesOnlyCreateOneInstance() throws AvroSerdeException { InstanceCache> ic = new InstanceCache>() { @Override - protected Wrapper makeInstance(Foo hv) { + protected Wrapper makeInstance(Foo hv, + Set seenSchemas) { return new Wrapper(hv); } }; Foo f1 = new Foo(); - Wrapper fc = ic.retrieve(f1); + Wrapper fc = ic.retrieve(f1, null); assertSame(f1, fc.wrapped); // Our original foo should be in the wrapper Foo f2 = new Foo(); // Different instance, same value - Wrapper fc2 = ic.retrieve(f2); + Wrapper fc2 = ic.retrieve(f2, null); assertSame(fc2,fc); // Since equiv f, should get back first container assertSame(fc2.wrapped, f1); } @@ -60,19 +62,20 @@ public class TestInstanceCache { @Test public void instanceCacheReturnsCorrectInstances() throws AvroSerdeException { InstanceCache> ic = new InstanceCache>() { - @Override - protected Wrapper makeInstance(String hv) { - return new Wrapper(hv); - } - }; + @Override + protected Wrapper makeInstance( + String hv, Set seenSchemas) { + return new Wrapper(hv); + } + }; - Wrapper one = ic.retrieve("one"); - Wrapper two = ic.retrieve("two"); + Wrapper one = ic.retrieve("one", null); + Wrapper two = ic.retrieve("two", null); - Wrapper anotherOne = ic.retrieve("one"); + Wrapper anotherOne = ic.retrieve("one", null); assertSame(one, anotherOne); - Wrapper anotherTwo = ic.retrieve("two"); + Wrapper anotherTwo = ic.retrieve("two", null); assertSame(two, anotherTwo); } }