Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5A9CA10FE8 for ; Sat, 4 Jan 2014 02:09:37 +0000 (UTC) Received: (qmail 17244 invoked by uid 500); 4 Jan 2014 02:09:37 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 17188 invoked by uid 500); 4 Jan 2014 02:09:36 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 17171 invoked by uid 99); 4 Jan 2014 02:09:36 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Jan 2014 02:09:36 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 22409820903; Sat, 4 Jan 2014 02:09:35 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Date: Sat, 04 Jan 2014 02:09:35 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/5] git commit: CRUNCH-312: Determine the right datum reader/writer for derived Avro types Updated Branches: refs/heads/apache-crunch-0.8 43285d1d5 -> ffe89ea3f CRUNCH-312: Determine the right datum reader/writer for derived Avro types Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d06fd4a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d06fd4a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d06fd4a Branch: refs/heads/apache-crunch-0.8 Commit: 5d06fd4a6b9d6b3f623af13969dc4ec523397f20 Parents: 43285d1 Author: Josh Wills Authored: Thu Dec 19 12:04:20 2013 -0800 Committer: Josh Wills Committed: Fri Jan 3 17:36:57 2014 -0800 ---------------------------------------------------------------------- .../apache/crunch/io/avro/AvroReflectIT.java | 33 +++++++++++++ .../apache/crunch/types/avro/AvroTableType.java | 4 +- .../org/apache/crunch/types/avro/AvroType.java | 51 ++++++++++++++++---- .../org/apache/crunch/types/avro/Avros.java | 34 +++++++------ 4 files changed, 96 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java index 7a90517..6f09cae 100644 --- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java @@ -28,12 +28,14 @@ import org.apache.crunch.MapFn; import org.apache.crunch.PCollection; import org.apache.crunch.Pair; import org.apache.crunch.Pipeline; +import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.lib.Aggregate; import org.apache.crunch.test.Person; import org.apache.crunch.test.StringWrapper; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PType; import org.apache.crunch.types.avro.Avros; import org.junit.Assume; import org.junit.Rule; @@ -106,4 +108,35 @@ public class AvroReflectIT implements Serializable { assertEquals(expected, materialized); pipeline.done(); } + + private static PType STRING_PTYPE = Avros.derived(String.class, + new MapFn() { public String map(StringWrapper in) { return in.getValue(); }}, + new MapFn() { public StringWrapper map(String out) { return new StringWrapper(out); }}, + Avros.reflects(StringWrapper.class)); + + @Test + public void testDerivedReflection() throws Exception { + Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration()); + PCollection stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")) + .parallelDo(IdentityFn.getInstance(), STRING_PTYPE); + List strings = Lists.newArrayList(stringWrapperCollection.materialize()); + pipeline.done(); + assertEquals(Lists.newArrayList("b", "c", "a", "e"), strings); + } + + @Test + public void testWrappedDerivedReflection() throws Exception { + Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration()); + PCollection> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")) + .parallelDo(new MapFn>() { + @Override + public Pair map(String input) { + return Pair.of(1L, input); + } + }, Avros.pairs(Avros.longs(), STRING_PTYPE)); + List> pairs = Lists.newArrayList(stringWrapperCollection.materialize()); + pipeline.done(); + assertEquals(pairs.size(), 4); + assertEquals(Pair.of(1L, "a"), pairs.get(2)); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java index 86613df..8e9e069 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java @@ -123,8 +123,8 @@ class AvroTableType extends AvroType> implements PTableType keyType, AvroType valueType, Class> pairClass) { super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(), valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(), - valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier( - Pair.class, keyType, valueType), keyType, valueType); + valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), + new TupleDeepCopier(Pair.class, keyType, valueType), null, keyType, valueType); this.keyType = keyType; this.valueType = valueType; } http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java index aea4951..4e35b91 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java @@ -44,6 +44,12 @@ import com.google.common.collect.Lists; */ public class AvroType implements PType { + public enum AvroRecordType { + REFLECT, + SPECIFIC, + GENERIC + } + private static final Converter AVRO_CONVERTER = new AvroKeyConverter(); private final Class typeClass; @@ -52,15 +58,16 @@ public class AvroType implements PType { private final MapFn baseInputMapFn; private final MapFn baseOutputMapFn; private final List subTypes; + private AvroRecordType recordType; private DeepCopier deepCopier; private boolean initialized = false; public AvroType(Class typeClass, Schema schema, DeepCopier deepCopier, PType... ptypes) { - this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, ptypes); + this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier, null, ptypes); } public AvroType(Class typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn, - DeepCopier deepCopier, PType... ptypes) { + DeepCopier deepCopier, AvroRecordType recordType, PType... ptypes) { this.typeClass = typeClass; this.schema = Preconditions.checkNotNull(schema); this.schemaString = schema.toString(); @@ -68,6 +75,23 @@ public class AvroType implements PType { this.baseOutputMapFn = outputMapFn; this.deepCopier = deepCopier; this.subTypes = ImmutableList. builder().add(ptypes).build(); + this.recordType = recordType; + } + + private AvroRecordType determineRecordType() { + if (checkReflect()) { + return AvroRecordType.REFLECT; + } else if (checkSpecific()) { + return AvroRecordType.SPECIFIC; + } + return AvroRecordType.GENERIC; + } + + public AvroRecordType getRecordType() { + if (recordType == null) { + recordType = determineRecordType(); + } + return recordType; } @Override @@ -98,14 +122,17 @@ public class AvroType implements PType { * @return true if the wrapped type is a specific data type or wraps one */ public boolean hasSpecific() { - if (Avros.isPrimitive(this)) { + return getRecordType() == AvroRecordType.SPECIFIC; + } + + private boolean checkSpecific() { + if (Avros.isPrimitive(typeClass)) { return false; } - if (!this.subTypes.isEmpty()) { - for (PType subType : this.subTypes) { - AvroType atype = (AvroType) subType; - if (atype.hasSpecific()) { + if (!subTypes.isEmpty()) { + for (PType subType : subTypes) { + if (((AvroType) subType).hasSpecific()) { return true; } } @@ -130,12 +157,16 @@ public class AvroType implements PType { * @return true if the wrapped type is a reflection-based type or wraps one. */ public boolean hasReflect() { - if (Avros.isPrimitive(this)) { + return getRecordType() == AvroRecordType.REFLECT; + } + + private boolean checkReflect() { + if (Avros.isPrimitive(typeClass)) { return false; } - if (!this.subTypes.isEmpty()) { - for (PType subType : this.subTypes) { + if (!subTypes.isEmpty()) { + for (PType subType : subTypes) { if (((AvroType) subType).hasReflect()) { return true; } http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java index 8d6bdd6..3d6b04f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java +++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java @@ -180,7 +180,7 @@ public class Avros { }; private static final AvroType strings = new AvroType(String.class, Schema.create(Schema.Type.STRING), - UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier()); + UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier(), AvroType.AvroRecordType.GENERIC); private static final AvroType nulls = create(Void.class, Schema.Type.NULL); private static final AvroType longs = create(Long.class, Schema.Type.LONG); private static final AvroType ints = create(Integer.class, Schema.Type.INT); @@ -188,12 +188,12 @@ public class Avros { private static final AvroType doubles = create(Double.class, Schema.Type.DOUBLE); private static final AvroType booleans = create(Boolean.class, Schema.Type.BOOLEAN); private static final AvroType bytes = new AvroType(ByteBuffer.class, - Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier()); + Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), + new DeepCopier.NoOpDeepCopier(), AvroType.AvroRecordType.GENERIC); private static final Map, PType> PRIMITIVES = ImmutableMap., PType> builder() .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class, floats) .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build(); - private static final Map, AvroType> EXTENSIONS = Maps.newHashMap(); public static void register(Class clazz, AvroType ptype) { @@ -208,6 +208,10 @@ public class Avros { return avroType.getTypeClass().isPrimitive() || PRIMITIVES.containsKey(avroType.getTypeClass()); } + static boolean isPrimitive(Class typeClass) { + return typeClass.isPrimitive() || PRIMITIVES.containsKey(typeClass); + } + private static AvroType create(Class clazz, Schema.Type schemaType) { return new AvroType(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier()); } @@ -315,7 +319,7 @@ public class Avros { public static final AvroType writables(Class clazz) { return new AvroType(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn(clazz), - new WritableToBytesMapFn(), new WritableDeepCopier(clazz)); + new WritableToBytesMapFn(), new WritableDeepCopier(clazz), AvroType.AvroRecordType.GENERIC); } private static class GenericDataArrayToCollection extends MapFn> { @@ -403,7 +407,8 @@ public class Avros { Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema())); GenericDataArrayToCollection input = new GenericDataArrayToCollection(avroType.getInputMapFn()); CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema, avroType.getOutputMapFn()); - return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier(ptype), ptype); + return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier(ptype), + avroType.getRecordType(), ptype); } private static class AvroMapToMap extends MapFn, Map> { @@ -475,7 +480,8 @@ public class Avros { Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema())); AvroMapToMap inputFn = new AvroMapToMap(avroType.getInputMapFn()); MapToAvroMap outputFn = new MapToAvroMap(avroType.getOutputMapFn()); - return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier(ptype), ptype); + return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier(ptype), + avroType.getRecordType(), ptype); } private static class GenericRecordToTuple extends MapFn { @@ -609,27 +615,27 @@ public class Avros { Schema schema = createTupleSchema(p1, p2); GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2); TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2); - return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), p1, p2); + return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class, p1, p2), null, p1, p2); } public static final AvroType> triples(PType p1, PType p2, PType p3) { Schema schema = createTupleSchema(p1, p2, p3); return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3, p1, p2, p3), - new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), p1, p2, p3); + new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1, p2, p3), null, p1, p2, p3); } public static final AvroType> quads(PType p1, PType p2, PType p3, PType p4) { Schema schema = createTupleSchema(p1, p2, p3, p4); return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4, p1, p2, p3, p4), - new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), p1, p2, - p3, p4); + new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class, p1, p2, p3, p4), null, + p1, p2, p3, p4); } public static final AvroType tuples(PType... ptypes) { Schema schema = createTupleSchema(ptypes); return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN, ptypes), - new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), ptypes); + new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes), null, ptypes); } public static AvroType tuples(Class clazz, PType... ptypes) { @@ -640,7 +646,7 @@ public class Avros { } TupleFactory factory = TupleFactory.create(clazz, typeArgs); return new AvroType(clazz, schema, new GenericRecordToTuple(factory, ptypes), new TupleToGenericRecord(schema, - ptypes), new TupleDeepCopier(clazz, ptypes), ptypes); + ptypes), new TupleDeepCopier(clazz, ptypes), null, ptypes); } private static Schema createTupleSchema(PType... ptypes) throws RuntimeException { @@ -668,8 +674,8 @@ public class Avros { PType base) { AvroType abase = (AvroType) base; return new AvroType(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(), inputFn), - new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier(), base.getSubTypes() - .toArray(new PType[0])); + new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier(), abase.getRecordType(), + base.getSubTypes().toArray(new PType[0])); } public static PType jsons(Class clazz) {