crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [1/5] git commit: CRUNCH-312: Determine the right datum reader/writer for derived Avro types
Date Sat, 04 Jan 2014 02:09:35 GMT
Updated Branches:
  refs/heads/apache-crunch-0.8 43285d1d5 -> ffe89ea3f


CRUNCH-312: Determine the right datum reader/writer for derived Avro types


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d06fd4a
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d06fd4a
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d06fd4a

Branch: refs/heads/apache-crunch-0.8
Commit: 5d06fd4a6b9d6b3f623af13969dc4ec523397f20
Parents: 43285d1
Author: Josh Wills <jwills@apache.org>
Authored: Thu Dec 19 12:04:20 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Fri Jan 3 17:36:57 2014 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/avro/AvroReflectIT.java    | 33 +++++++++++++
 .../apache/crunch/types/avro/AvroTableType.java |  4 +-
 .../org/apache/crunch/types/avro/AvroType.java  | 51 ++++++++++++++++----
 .../org/apache/crunch/types/avro/Avros.java     | 34 +++++++------
 4 files changed, 96 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
index 7a90517..6f09cae 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroReflectIT.java
@@ -28,12 +28,14 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.types.avro.Avros;
 import org.junit.Assume;
 import org.junit.Rule;
@@ -106,4 +108,35 @@ public class AvroReflectIT implements Serializable {
     assertEquals(expected, materialized);
     pipeline.done();
   }
+
+  private static PType<String> STRING_PTYPE = Avros.derived(String.class,
+      new MapFn<StringWrapper, String>() { public String map(StringWrapper in) { return
in.getValue(); }},
+      new MapFn<String, StringWrapper>() { public StringWrapper map(String out) { return
new StringWrapper(out); }},
+      Avros.reflects(StringWrapper.class));
+
+  @Test
+  public void testDerivedReflection() throws Exception {
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<String> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+        .parallelDo(IdentityFn.<String>getInstance(), STRING_PTYPE);
+    List<String> strings = Lists.newArrayList(stringWrapperCollection.materialize());
+    pipeline.done();
+    assertEquals(Lists.newArrayList("b", "c", "a", "e"), strings);
+  }
+
+  @Test
+  public void testWrappedDerivedReflection() throws Exception {
+    Pipeline pipeline = new MRPipeline(AvroReflectIT.class, tmpDir.getDefaultConfiguration());
+    PCollection<Pair<Long, String>> stringWrapperCollection = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+        .parallelDo(new MapFn<String, Pair<Long, String>>() {
+          @Override
+          public Pair<Long, String> map(String input) {
+            return Pair.of(1L, input);
+          }
+        }, Avros.pairs(Avros.longs(), STRING_PTYPE));
+    List<Pair<Long, String>> pairs = Lists.newArrayList(stringWrapperCollection.materialize());
+    pipeline.done();
+    assertEquals(pairs.size(), 4);
+    assertEquals(Pair.of(1L, "a"), pairs.get(2));
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 86613df..8e9e069 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -123,8 +123,8 @@ class AvroTableType<K, V> extends AvroType<Pair<K, V>>
implements PTableType<K,
   public AvroTableType(AvroType<K> keyType, AvroType<V> valueType, Class<Pair<K,
V>> pairClass) {
     super(pairClass, org.apache.avro.mapred.Pair.getPairSchema(keyType.getSchema(),
         valueType.getSchema()), new IndexedRecordToPair(keyType.getInputMapFn(),
-        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType), new TupleDeepCopier(
-        Pair.class, keyType, valueType), keyType, valueType);
+        valueType.getInputMapFn()), new PairToAvroPair(keyType, valueType),
+        new TupleDeepCopier(Pair.class, keyType, valueType), null, keyType, valueType);
     this.keyType = keyType;
     this.valueType = valueType;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
index aea4951..4e35b91 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroType.java
@@ -44,6 +44,12 @@ import com.google.common.collect.Lists;
  */
 public class AvroType<T> implements PType<T> {
 
+  public enum AvroRecordType {
+    REFLECT,
+    SPECIFIC,
+    GENERIC
+  }
+
   private static final Converter AVRO_CONVERTER = new AvroKeyConverter();
 
   private final Class<T> typeClass;
@@ -52,15 +58,16 @@ public class AvroType<T> implements PType<T> {
   private final MapFn baseInputMapFn;
   private final MapFn baseOutputMapFn;
   private final List<PType> subTypes;
+  private AvroRecordType recordType;
   private DeepCopier<T> deepCopier;
   private boolean initialized = false;
 
   public AvroType(Class<T> typeClass, Schema schema, DeepCopier<T> deepCopier,
PType... ptypes) {
-    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier,
ptypes);
+    this(typeClass, schema, IdentityFn.getInstance(), IdentityFn.getInstance(), deepCopier,
null, ptypes);
   }
 
   public AvroType(Class<T> typeClass, Schema schema, MapFn inputMapFn, MapFn outputMapFn,
-      DeepCopier<T> deepCopier, PType... ptypes) {
+      DeepCopier<T> deepCopier, AvroRecordType recordType, PType... ptypes) {
     this.typeClass = typeClass;
     this.schema = Preconditions.checkNotNull(schema);
     this.schemaString = schema.toString();
@@ -68,6 +75,23 @@ public class AvroType<T> implements PType<T> {
     this.baseOutputMapFn = outputMapFn;
     this.deepCopier = deepCopier;
     this.subTypes = ImmutableList.<PType> builder().add(ptypes).build();
+    this.recordType = recordType;
+  }
+
+  private AvroRecordType determineRecordType() {
+    if (checkReflect()) {
+      return AvroRecordType.REFLECT;
+    } else if (checkSpecific()) {
+      return AvroRecordType.SPECIFIC;
+    }
+    return AvroRecordType.GENERIC;
+  }
+
+  public AvroRecordType getRecordType() {
+    if (recordType == null) {
+      recordType = determineRecordType();
+    }
+    return recordType;
   }
 
   @Override
@@ -98,14 +122,17 @@ public class AvroType<T> implements PType<T> {
    * @return true if the wrapped type is a specific data type or wraps one
    */
   public boolean hasSpecific() {
-    if (Avros.isPrimitive(this)) {
+    return getRecordType() == AvroRecordType.SPECIFIC;
+  }
+
+  private boolean checkSpecific() {
+    if (Avros.isPrimitive(typeClass)) {
       return false;
     }
 
-    if (!this.subTypes.isEmpty()) {
-      for (PType<?> subType : this.subTypes) {
-        AvroType<?> atype = (AvroType<?>) subType;
-        if (atype.hasSpecific()) {
+    if (!subTypes.isEmpty()) {
+      for (PType<?> subType : subTypes) {
+        if (((AvroType<?>) subType).hasSpecific()) {
           return true;
         }
       }
@@ -130,12 +157,16 @@ public class AvroType<T> implements PType<T> {
    * @return true if the wrapped type is a reflection-based type or wraps one.
    */
   public boolean hasReflect() {
-    if (Avros.isPrimitive(this)) {
+    return getRecordType() == AvroRecordType.REFLECT;
+  }
+
+  private boolean checkReflect() {
+    if (Avros.isPrimitive(typeClass)) {
       return false;
     }
 
-    if (!this.subTypes.isEmpty()) {
-      for (PType<?> subType : this.subTypes) {
+    if (!subTypes.isEmpty()) {
+      for (PType<?> subType : subTypes) {
         if (((AvroType<?>) subType).hasReflect()) {
           return true;
         }

http://git-wip-us.apache.org/repos/asf/crunch/blob/5d06fd4a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index 8d6bdd6..3d6b04f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -180,7 +180,7 @@ public class Avros {
   };
 
   private static final AvroType<String> strings = new AvroType<String>(String.class,
Schema.create(Schema.Type.STRING),
-      UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>());
+      UTF8_TO_STRING, STRING_TO_UTF8, new DeepCopier.NoOpDeepCopier<String>(), AvroType.AvroRecordType.GENERIC);
   private static final AvroType<Void> nulls = create(Void.class, Schema.Type.NULL);
   private static final AvroType<Long> longs = create(Long.class, Schema.Type.LONG);
   private static final AvroType<Integer> ints = create(Integer.class, Schema.Type.INT);
@@ -188,12 +188,12 @@ public class Avros {
   private static final AvroType<Double> doubles = create(Double.class, Schema.Type.DOUBLE);
   private static final AvroType<Boolean> booleans = create(Boolean.class, Schema.Type.BOOLEAN);
   private static final AvroType<ByteBuffer> bytes = new AvroType<ByteBuffer>(ByteBuffer.class,
-      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(), new DeepCopier.NoOpDeepCopier<ByteBuffer>());
+      Schema.create(Schema.Type.BYTES), BYTES_IN, IdentityFn.getInstance(),
+      new DeepCopier.NoOpDeepCopier<ByteBuffer>(), AvroType.AvroRecordType.GENERIC);
 
   private static final Map<Class<?>, PType<?>> PRIMITIVES = ImmutableMap.<Class<?>,
PType<?>> builder()
       .put(String.class, strings).put(Long.class, longs).put(Integer.class, ints).put(Float.class,
floats)
       .put(Double.class, doubles).put(Boolean.class, booleans).put(ByteBuffer.class, bytes).build();
-
   private static final Map<Class<?>, AvroType<?>> EXTENSIONS = Maps.newHashMap();
 
   public static <T> void register(Class<T> clazz, AvroType<T> ptype) {
@@ -208,6 +208,10 @@ public class Avros {
     return avroType.getTypeClass().isPrimitive() || PRIMITIVES.containsKey(avroType.getTypeClass());
   }
 
+  static <T> boolean isPrimitive(Class<T> typeClass) {
+    return typeClass.isPrimitive() || PRIMITIVES.containsKey(typeClass);
+  }
+
   private static <T> AvroType<T> create(Class<T> clazz, Schema.Type schemaType)
{
     return new AvroType<T>(clazz, Schema.create(schemaType), new DeepCopier.NoOpDeepCopier<T>());
   }
@@ -315,7 +319,7 @@ public class Avros {
 
   public static final <T extends Writable> AvroType<T> writables(Class<T>
clazz) {
     return new AvroType<T>(clazz, Schema.create(Schema.Type.BYTES), new BytesToWritableMapFn<T>(clazz),
-        new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz));
+        new WritableToBytesMapFn<T>(), new WritableDeepCopier<T>(clazz), AvroType.AvroRecordType.GENERIC);
   }
 
   private static class GenericDataArrayToCollection<T> extends MapFn<Object, Collection<T>>
{
@@ -403,7 +407,8 @@ public class Avros {
     Schema collectionSchema = Schema.createArray(allowNulls(avroType.getSchema()));
     GenericDataArrayToCollection<T> input = new GenericDataArrayToCollection<T>(avroType.getInputMapFn());
     CollectionToGenericDataArray output = new CollectionToGenericDataArray(collectionSchema,
avroType.getOutputMapFn());
-    return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype),
ptype);
+    return new AvroType(Collection.class, collectionSchema, input, output, new CollectionDeepCopier<T>(ptype),
+        avroType.getRecordType(), ptype);
   }
 
   private static class AvroMapToMap<T> extends MapFn<Map<CharSequence, Object>,
Map<String, T>> {
@@ -475,7 +480,8 @@ public class Avros {
     Schema mapSchema = Schema.createMap(allowNulls(avroType.getSchema()));
     AvroMapToMap<T> inputFn = new AvroMapToMap<T>(avroType.getInputMapFn());
     MapToAvroMap<T> outputFn = new MapToAvroMap<T>(avroType.getOutputMapFn());
-    return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype),
ptype);
+    return new AvroType(Map.class, mapSchema, inputFn, outputFn, new MapDeepCopier<T>(ptype),
+        avroType.getRecordType(), ptype);
   }
 
   private static class GenericRecordToTuple extends MapFn<GenericRecord, Tuple> {
@@ -609,27 +615,27 @@ public class Avros {
     Schema schema = createTupleSchema(p1, p2);
     GenericRecordToTuple input = new GenericRecordToTuple(TupleFactory.PAIR, p1, p2);
     TupleToGenericRecord output = new TupleToGenericRecord(schema, p1, p2);
-    return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class,
p1, p2), p1, p2);
+    return new AvroType(Pair.class, schema, input, output, new TupleDeepCopier(Pair.class,
p1, p2), null, p1, p2);
   }
 
   public static final <V1, V2, V3> AvroType<Tuple3<V1, V2, V3>> triples(PType<V1>
p1, PType<V2> p2, PType<V3> p3) {
     Schema schema = createTupleSchema(p1, p2, p3);
     return new AvroType(Tuple3.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE3,
p1, p2, p3),
-        new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1,
p2, p3), p1, p2, p3);
+        new TupleToGenericRecord(schema, p1, p2, p3), new TupleDeepCopier(Tuple3.class, p1,
p2, p3), null, p1, p2, p3);
   }
 
   public static final <V1, V2, V3, V4> AvroType<Tuple4<V1, V2, V3, V4>>
quads(PType<V1> p1, PType<V2> p2, PType<V3> p3,
       PType<V4> p4) {
     Schema schema = createTupleSchema(p1, p2, p3, p4);
     return new AvroType(Tuple4.class, schema, new GenericRecordToTuple(TupleFactory.TUPLE4,
p1, p2, p3, p4),
-        new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class,
p1, p2, p3, p4), p1, p2,
-        p3, p4);
+        new TupleToGenericRecord(schema, p1, p2, p3, p4), new TupleDeepCopier(Tuple4.class,
p1, p2, p3, p4), null,
+        p1, p2, p3, p4);
   }
 
   public static final AvroType<TupleN> tuples(PType... ptypes) {
     Schema schema = createTupleSchema(ptypes);
     return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN,
ptypes),
-        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes),
ptypes);
+        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes),
null, ptypes);
   }
 
   public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType...
ptypes) {
@@ -640,7 +646,7 @@ public class Avros {
     }
     TupleFactory<T> factory = TupleFactory.create(clazz, typeArgs);
     return new AvroType<T>(clazz, schema, new GenericRecordToTuple(factory, ptypes),
new TupleToGenericRecord(schema,
-        ptypes), new TupleDeepCopier(clazz, ptypes), ptypes);
+        ptypes), new TupleDeepCopier(clazz, ptypes), null, ptypes);
   }
 
   private static Schema createTupleSchema(PType<?>... ptypes) throws RuntimeException
{
@@ -668,8 +674,8 @@ public class Avros {
       PType<S> base) {
     AvroType<S> abase = (AvroType<S>) base;
     return new AvroType<T>(clazz, abase.getSchema(), new CompositeMapFn(abase.getInputMapFn(),
inputFn),
-        new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(),
base.getSubTypes()
-            .toArray(new PType[0]));
+        new CompositeMapFn(outputFn, abase.getOutputMapFn()), new DeepCopier.NoOpDeepCopier<T>(),
abase.getRecordType(),
+        base.getSubTypes().toArray(new PType[0]));
   }
 
   public static <T> PType<T> jsons(Class<T> clazz) {


Mime
View raw message