incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject [2/2] Correct use of Avro reflection values in reducers
Date Thu, 12 Jul 2012 07:06:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/lib/SortTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/SortTest.java b/crunch/src/test/java/org/apache/crunch/lib/SortTest.java
index 8d2838f..981426b 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/SortTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/SortTest.java
@@ -20,17 +20,17 @@ package org.apache.crunch.lib;
 import static org.apache.crunch.lib.Sort.ColumnOrder.by;
 import static org.apache.crunch.lib.Sort.Order.ASCENDING;
 import static org.apache.crunch.lib.Sort.Order.DESCENDING;
+import static org.apache.crunch.test.StringWrapper.wrap;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
-
-import org.junit.Ignore;
-import org.junit.Test;
+import java.util.List;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
@@ -42,113 +42,163 @@ import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Sort.ColumnOrder;
 import org.apache.crunch.lib.Sort.Order;
 import org.apache.crunch.test.FileHelper;
+import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
 
 public class SortTest implements Serializable {
-  
+
   @Test
   public void testWritableSortAsc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        Order.ASCENDING, "A\tand this text as well");
+    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), Order.ASCENDING,
+        "A\tand this text as well");
   }
 
   @Test
   public void testWritableSortDesc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        Order.DESCENDING, "B\tthis doc has some text");
+    runSingle(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), Order.DESCENDING,
+        "B\tthis doc has some text");
   }
-  
+
   @Test
   public void testWritableSortAscDesc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), "A", "this doc has this text");
   }
 
   @Test
   public void testWritableSortSecondDescFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(2, DESCENDING),
+        by(1, ASCENDING), "A", "this doc has this text");
   }
 
   @Test
   public void testWritableSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+    runTriple(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
   }
 
   @Test
   public void testWritableSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this",
"doc", "has");
+    runQuad(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
   }
 
   @Test
   public void testWritableSortTupleNAscDesc() throws Exception {
-    runTupleN(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING)}, new String[] { "A", "this
doc has this text" });
+    runTupleN(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), new ColumnOrder[]
{
+        by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this doc has this text"
});
   }
 
   @Test
   public void testWritableSortTable() throws Exception {
-    runTable(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(),
-        "A");
+    runTable(new MRPipeline(SortTest.class), WritableTypeFamily.getInstance(), "A");
   }
-  
+
   @Test
   public void testAvroSortAsc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        Order.ASCENDING, "A\tand this text as well");
+    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), Order.ASCENDING,
+        "A\tand this text as well");
   }
-  
+
   @Test
   public void testAvroSortDesc() throws Exception {
-    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        Order.DESCENDING, "B\tthis doc has some text");
+    runSingle(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), Order.DESCENDING,
+        "B\tthis doc has some text");
   }
-  
+
   @Test
   public void testAvroSortPairAscAsc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), "A", "this doc has this text");
   }
-  
+
   @Test
   @Ignore("Avro sorting only works in field order at the moment")
   public void testAvroSortPairSecondAscFirstDesc() throws Exception {
-    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(2, DESCENDING), by(1, ASCENDING), "A", "this doc has this text");
+    runPair(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(2, DESCENDING),
+        by(1, ASCENDING), "A", "this doc has this text");
   }
-  
+
   @Test
   public void testAvroSortTripleAscDescAsc() throws Exception {
-    runTriple(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
+    runTriple(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), "A", "this", "doc");
   }
 
   @Test
   public void testAvroSortQuadAscDescAscDesc() throws Exception {
-    runQuad(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        by(1, ASCENDING), by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this",
"doc", "has");
+    runQuad(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), by(1, ASCENDING),
+        by(2, DESCENDING), by(3, ASCENDING), by(4, DESCENDING), "A", "this", "doc", "has");
   }
 
   @Test
   public void testAvroSortTupleNAscDesc() throws Exception {
     runTupleN(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(),
-        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A", "this
doc has this text" });
+        new ColumnOrder[] { by(1, ASCENDING), by(2, DESCENDING) }, new String[] { "A",
+            "this doc has this text" });
+  }
+
+  @Test
+  public void testAvroReflectSortPair() throws IOException {
+    Pipeline pipeline = new MRPipeline(SortTest.class);
+    PCollection<Pair<String, StringWrapper>> sorted = pipeline
+        .readTextFile(FileHelper.createTempCopyOf("set2.txt"))
+        .parallelDo(new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.pairs(Avros.strings(), Avros.reflects(StringWrapper.class))).sort(true);
+
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
+  }
+
+  @Test
+  public void testAvroReflectSortTable() throws IOException {
+    Pipeline pipeline = new MRPipeline(SortTest.class);
+    PTable<String, StringWrapper> unsorted = pipeline.readTextFile(
+        FileHelper.createTempCopyOf("set2.txt")).parallelDo(
+        new MapFn<String, Pair<String, StringWrapper>>() {
+
+          @Override
+          public Pair<String, StringWrapper> map(String input) {
+            return Pair.of(input, wrap(input));
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.reflects(StringWrapper.class)));
+
+    PTable<String, StringWrapper> sorted = Sort.sort(unsorted);
+
+    List<Pair<String, StringWrapper>> expected = Lists.newArrayList();
+    expected.add(Pair.of("a", wrap("a")));
+    expected.add(Pair.of("c", wrap("c")));
+    expected.add(Pair.of("d", wrap("d")));
+
+    assertEquals(expected, Lists.newArrayList(sorted.materialize()));
   }
-  
+
   @Test
   public void testAvroSortTable() throws Exception {
     runTable(new MRPipeline(SortTest.class), AvroTypeFamily.getInstance(), "A");
   }
 
-  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily,
-      Order order, String firstLine) throws IOException {
+  private void runSingle(Pipeline pipeline, PTypeFamily typeFamily, Order order, String firstLine)
+      throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
+
     PCollection<String> input = pipeline.readTextFile(inputPath);
     // following turns the input from Writables to required type family
     PCollection<String> input2 = input.parallelDo(new DoFn<String, String>()
{
@@ -159,24 +209,24 @@ public class SortTest implements Serializable {
     }, typeFamily.strings());
     PCollection<String> sorted = Sort.sort(input2, order);
     Iterable<String> lines = sorted.materialize();
-    
+
     assertEquals(firstLine, lines.iterator().next());
     pipeline.done(); // TODO: finally
   }
-  
-  private void runPair(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder first, ColumnOrder second, String firstField, String secondField) throws
IOException {
+
+  private void runPair(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
+      ColumnOrder second, String firstField, String secondField) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
+
     PCollection<String> input = pipeline.readTextFile(inputPath);
     PCollection<Pair<String, String>> kv = input.parallelDo(
-      new DoFn<String, Pair<String, String>>() {
-        @Override
-        public void process(String input, Emitter<Pair<String, String>> emitter)
{
-          String[] split = input.split("[\t]+");
-          emitter.emit(Pair.of(split[0], split[1]));
-        }
-    }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
+        new DoFn<String, Pair<String, String>>() {
+          @Override
+          public void process(String input, Emitter<Pair<String, String>> emitter)
{
+            String[] split = input.split("[\t]+");
+            emitter.emit(Pair.of(split[0], split[1]));
+          }
+        }, typeFamily.pairs(typeFamily.strings(), typeFamily.strings()));
     PCollection<Pair<String, String>> sorted = Sort.sortPairs(kv, first, second);
     Iterable<Pair<String, String>> lines = sorted.materialize();
     Pair<String, String> l = lines.iterator().next();
@@ -184,21 +234,22 @@ public class SortTest implements Serializable {
     assertEquals(secondField, l.second());
     pipeline.done();
   }
-  
-  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder first, ColumnOrder second, ColumnOrder third, String firstField, String
secondField, String thirdField) throws IOException {
+
+  private void runTriple(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
+      ColumnOrder second, ColumnOrder third, String firstField, String secondField,
+      String thirdField) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
+
     PCollection<String> input = pipeline.readTextFile(inputPath);
     PCollection<Tuple3<String, String, String>> kv = input.parallelDo(
-      new DoFn<String, Tuple3<String, String, String>>() {
-        @Override
-        public void process(String input, Emitter<Tuple3<String, String, String>>
emitter) {
-          String[] split = input.split("[\t ]+");
-          int len = split.length;
-          emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
-        }
-    }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
+        new DoFn<String, Tuple3<String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple3<String, String, String>>
emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple3.of(split[0], split[1 % len], split[2 % len]));
+          }
+        }, typeFamily.triples(typeFamily.strings(), typeFamily.strings(), typeFamily.strings()));
     PCollection<Tuple3<String, String, String>> sorted = Sort.sortTriples(kv,
first, second, third);
     Iterable<Tuple3<String, String, String>> lines = sorted.materialize();
     Tuple3<String, String, String> l = lines.iterator().next();
@@ -207,23 +258,25 @@ public class SortTest implements Serializable {
     assertEquals(thirdField, l.third());
     pipeline.done();
   }
-  
-  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder first, ColumnOrder second, ColumnOrder third, ColumnOrder fourth,
-      String firstField, String secondField, String thirdField, String fourthField) throws
IOException {
+
+  private void runQuad(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder first,
+      ColumnOrder second, ColumnOrder third, ColumnOrder fourth, String firstField,
+      String secondField, String thirdField, String fourthField) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
+
     PCollection<String> input = pipeline.readTextFile(inputPath);
     PCollection<Tuple4<String, String, String, String>> kv = input.parallelDo(
-      new DoFn<String, Tuple4<String, String, String, String>>() {
-        @Override
-        public void process(String input, Emitter<Tuple4<String, String, String, String>>
emitter) {
-          String[] split = input.split("[\t ]+");
-          int len = split.length;
-          emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
-        }
-    }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(),
typeFamily.strings()));
-    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv,
first, second, third, fourth);
+        new DoFn<String, Tuple4<String, String, String, String>>() {
+          @Override
+          public void process(String input, Emitter<Tuple4<String, String, String,
String>> emitter) {
+            String[] split = input.split("[\t ]+");
+            int len = split.length;
+            emitter.emit(Tuple4.of(split[0], split[1 % len], split[2 % len], split[3 % len]));
+          }
+        }, typeFamily.quads(typeFamily.strings(), typeFamily.strings(), typeFamily.strings(),
+            typeFamily.strings()));
+    PCollection<Tuple4<String, String, String, String>> sorted = Sort.sortQuads(kv,
first, second,
+        third, fourth);
     Iterable<Tuple4<String, String, String, String>> lines = sorted.materialize();
     Tuple4<String, String, String, String> l = lines.iterator().next();
     assertEquals(firstField, l.first());
@@ -232,46 +285,44 @@ public class SortTest implements Serializable {
     assertEquals(fourthField, l.fourth());
     pipeline.done();
   }
-  
-  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily,
-      ColumnOrder[] orders, String[] fields) throws IOException {
+
+  private void runTupleN(Pipeline pipeline, PTypeFamily typeFamily, ColumnOrder[] orders,
+      String[] fields) throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
+
     PCollection<String> input = pipeline.readTextFile(inputPath);
     PType[] types = new PType[orders.length];
     Arrays.fill(types, typeFamily.strings());
-    PCollection<TupleN> kv = input.parallelDo(
-      new DoFn<String, TupleN>() {
-        @Override
-        public void process(String input, Emitter<TupleN> emitter) {
-          String[] split = input.split("[\t]+");
-          emitter.emit(new TupleN(split));
-        }
+    PCollection<TupleN> kv = input.parallelDo(new DoFn<String, TupleN>() {
+      @Override
+      public void process(String input, Emitter<TupleN> emitter) {
+        String[] split = input.split("[\t]+");
+        emitter.emit(new TupleN(split));
+      }
     }, typeFamily.tuples(types));
     PCollection<TupleN> sorted = Sort.sortTuples(kv, orders);
     Iterable<TupleN> lines = sorted.materialize();
     TupleN l = lines.iterator().next();
     int i = 0;
     for (String field : fields) {
-      assertEquals(field, l.get(i++));      
+      assertEquals(field, l.get(i++));
     }
     pipeline.done();
   }
 
-  private void runTable(Pipeline pipeline, PTypeFamily typeFamily,
-      String firstKey) throws IOException {
+  private void runTable(Pipeline pipeline, PTypeFamily typeFamily, String firstKey)
+      throws IOException {
     String inputPath = FileHelper.createTempCopyOf("docs.txt");
-    
+
     PCollection<String> input = pipeline.readTextFile(inputPath);
-    PTable<String, String> table = input.parallelDo(
-        new DoFn<String, Pair<String, String>>() {
-          @Override
-          public void process(String input, Emitter<Pair<String, String>> emitter)
{
-            String[] split = input.split("[\t]+");
-            emitter.emit(Pair.of(split[0], split[1]));
-          }
-      }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
-    
+    PTable<String, String> table = input.parallelDo(new DoFn<String, Pair<String,
String>>() {
+      @Override
+      public void process(String input, Emitter<Pair<String, String>> emitter)
{
+        String[] split = input.split("[\t]+");
+        emitter.emit(Pair.of(split[0], split[1]));
+      }
+    }, typeFamily.tableOf(typeFamily.strings(), typeFamily.strings()));
+
     PTable<String, String> sorted = Sort.sort(table);
     Iterable<Pair<String, String>> lines = sorted.materialize();
     Pair<String, String> l = lines.iterator().next();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java b/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java
new file mode 100644
index 0000000..5b3c4c4
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/test/StringWrapper.java
@@ -0,0 +1,85 @@
+package org.apache.crunch.test;
+
+import org.apache.crunch.MapFn;
+
+/**
+ * Simple String wrapper for testing with Avro reflection.
+ */
+public class StringWrapper implements Comparable<StringWrapper> {
+
+  public static class StringToStringWrapperMapFn extends MapFn<String, StringWrapper>
{
+
+    @Override
+    public StringWrapper map(String input) {
+      return wrap(input);
+    }
+
+  }
+
+  public static class StringWrapperToStringMapFn extends MapFn<StringWrapper, String>
{
+
+    @Override
+    public String map(StringWrapper input) {
+      return input.getValue();
+    }
+
+  }
+
+  private String value;
+
+  public StringWrapper() {
+    this("");
+  }
+
+  public StringWrapper(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public int compareTo(StringWrapper o) {
+    return this.value.compareTo(o.value);
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public void setValue(String value) {
+    this.value = value;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    StringWrapper other = (StringWrapper) obj;
+    if (value == null) {
+      if (other.value != null)
+        return false;
+    } else if (!value.equals(other.value))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "StringWrapper [value=" + value + "]";
+  }
+
+  public static StringWrapper wrap(String value) {
+    return new StringWrapper(value);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
index fa9da1a..9917685 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTableTypeTest.java
@@ -18,13 +18,16 @@
 package org.apache.crunch.types.avro;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertSame;
-
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
+import org.junit.Test;
+
 import com.google.common.collect.Lists;
 
 public class AvroTableTypeTest {
@@ -49,4 +52,19 @@ public class AvroTableTypeTest {
     assertNotSame(person, detachedPair.second());
   }
 
+  @Test
+  public void testIsReflect_ContainsReflectKey() {
+    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_ContainsReflectValue() {
+    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect());
+  }
+
+  @Test
+  public void testReflect_NoReflectKeyOrValue() {
+    assertFalse(Avros.tableOf(Avros.ints(), Avros.ints()).isReflect());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
index 2bebca1..2a80a5e 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvroTypeTest.java
@@ -25,66 +25,113 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.crunch.test.Person;
+import org.apache.crunch.test.StringWrapper;
 import org.junit.Test;
 
-import org.apache.crunch.test.Person;
 import com.google.common.collect.Lists;
 
 public class AvroTypeTest {
 
-	@Test
-	public void testIsSpecific_SpecificData() {
-		assertTrue(Avros.records(Person.class).isSpecific());
-	}
-
-	@Test
-	public void testIsGeneric_SpecificData() {
-		assertFalse(Avros.records(Person.class).isGeneric());
-	}
-
-	@Test
-	public void testIsSpecific_GenericData() {
-		assertFalse(Avros.generics(Person.SCHEMA$).isSpecific());
-	}
-
-	@Test
-	public void testIsGeneric_GenericData() {
-		assertTrue(Avros.generics(Person.SCHEMA$).isGeneric());
-	}
-
-	@Test
-	public void testIsSpecific_NonAvroClass() {
-		assertFalse(Avros.ints().isSpecific());
-	}
-
-	@Test
-	public void testIsGeneric_NonAvroClass() {
-		assertFalse(Avros.ints().isGeneric());
-	}
-
-	@Test
-	public void testIsSpecific_SpecificAvroTable() {
-		assertTrue(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
-				.isSpecific());
-	}
-
-	@Test
-	public void testIsGeneric_SpecificAvroTable() {
-		assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class))
-				.isGeneric());
-	}
-
-	@Test
-	public void testIsSpecific_GenericAvroTable() {
-		assertFalse(Avros.tableOf(Avros.strings(),
-				Avros.generics(Person.SCHEMA$)).isSpecific());
-	}
-
-	@Test
-	public void testIsGeneric_GenericAvroTable() {
-		assertTrue(Avros.tableOf(Avros.strings(),
-				Avros.generics(Person.SCHEMA$)).isGeneric());
-	}
+  @Test
+  public void testIsSpecific_SpecificData() {
+    assertTrue(Avros.records(Person.class).isSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_SpecificData() {
+    assertFalse(Avros.records(Person.class).isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_GenericData() {
+    assertFalse(Avros.generics(Person.SCHEMA$).isSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_GenericData() {
+    assertTrue(Avros.generics(Person.SCHEMA$).isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_NonAvroClass() {
+    assertFalse(Avros.ints().isSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_NonAvroClass() {
+    assertFalse(Avros.ints().isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_SpecificAvroTable() {
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_SpecificAvroTable() {
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.records(Person.class)).isGeneric());
+  }
+
+  @Test
+  public void testIsSpecific_GenericAvroTable() {
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isSpecific());
+  }
+
+  @Test
+  public void testIsGeneric_GenericAvroTable() {
+    assertFalse(Avros.tableOf(Avros.strings(), Avros.generics(Person.SCHEMA$)).isGeneric());
+  }
+
+  @Test
+  public void testIsReflect_GenericType() {
+    assertFalse(Avros.generics(Person.SCHEMA$).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_SpecificType() {
+    assertFalse(Avros.records(Person.class).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_ReflectSimpleType() {
+    assertTrue(Avros.reflects(StringWrapper.class).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_NonReflectSubType() {
+    assertFalse(Avros.pairs(Avros.ints(), Avros.ints()).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_ReflectSubType() {
+    assertTrue(Avros.pairs(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_TableOfNonReflectTypes() {
+    assertFalse(Avros.tableOf(Avros.ints(), Avros.strings()).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_TableWithReflectKey() {
+    assertTrue(Avros.tableOf(Avros.reflects(StringWrapper.class), Avros.ints()).isReflect());
+  }
+
+  @Test
+  public void testIsReflect_TableWithReflectValue() {
+    assertTrue(Avros.tableOf(Avros.ints(), Avros.reflects(StringWrapper.class)).isReflect());
+  }
+
+  @Test
+  public void testReflect_CollectionContainingReflectValue() {
+    assertTrue(Avros.collections(Avros.reflects(StringWrapper.class)).isReflect());
+  }
+
+  @Test
+  public void testReflect_CollectionNotContainingReflectValue() {
+    assertFalse(Avros.collections(Avros.generics(Person.SCHEMA$)).isReflect());
+  }
 
   @Test
   public void testGetDetachedValue_AlreadyMappedAvroType() {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/f699409f/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
index d9a2735..c71207b 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -24,14 +24,14 @@ import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.util.Utf8;
-import org.apache.hadoop.io.LongWritable;
-import org.junit.Test;
-
 import org.apache.crunch.Pair;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
@@ -39,6 +39,10 @@ import org.apache.crunch.TupleN;
 import org.apache.crunch.test.Person;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
@@ -52,41 +56,44 @@ public class AvrosTest {
     Void n = null;
     testInputOutputFn(Avros.nulls(), n, n);
   }
-  
+
   @Test
   public void testStrings() throws Exception {
     String s = "abc";
     Utf8 w = new Utf8(s);
     testInputOutputFn(Avros.strings(), s, w);
   }
-  
+
   @Test
   public void testInts() throws Exception {
     int j = 55;
     testInputOutputFn(Avros.ints(), j, j);
   }
+
   @Test
   public void testLongs() throws Exception {
     long j = Long.MAX_VALUE;
     testInputOutputFn(Avros.longs(), j, j);
   }
+
   @Test
   public void testFloats() throws Exception {
     float j = Float.MIN_VALUE;
     testInputOutputFn(Avros.floats(), j, j);
   }
+
   @Test
   public void testDoubles() throws Exception {
     double j = Double.MIN_VALUE;
     testInputOutputFn(Avros.doubles(), j, j);
   }
-  
+
   @Test
   public void testBooleans() throws Exception {
     boolean j = true;
     testInputOutputFn(Avros.booleans(), j, j);
   }
-  
+
   @Test
   public void testBytes() throws Exception {
     byte[] bytes = new byte[] { 17, 26, -98 };
@@ -99,22 +106,21 @@ public class AvrosTest {
     Collection<String> j = Lists.newArrayList();
     j.add("a");
     j.add("b");
-    Schema collectionSchema = Schema.createArray(
-        Schema.createUnion(ImmutableList.of(
-            Avros.strings().getSchema(), Schema.create(Type.NULL))));
+    Schema collectionSchema = Schema.createArray(Schema.createUnion(ImmutableList.of(Avros
+        .strings().getSchema(), Schema.create(Type.NULL))));
     GenericData.Array<Utf8> w = new GenericData.Array<Utf8>(2, collectionSchema);
     w.add(new Utf8("a"));
     w.add(new Utf8("b"));
     testInputOutputFn(Avros.collections(Avros.strings()), j, w);
   }
-  
+
   @Test
   public void testNestedTables() throws Exception {
     PTableType<Long, Long> pll = Avros.tableOf(Avros.longs(), Avros.longs());
     String schema = Avros.tableOf(pll, Avros.strings()).getSchema().toString();
     assertNotNull(schema);
   }
-  
+
   @Test
   public void testPairs() throws Exception {
     AvroType<Pair<String, String>> at = Avros.pairs(Avros.strings(), Avros.strings());
@@ -124,15 +130,15 @@ public class AvrosTest {
     w.put(1, new Utf8("b"));
     testInputOutputFn(at, j, w);
   }
-  
+
   @Test
   public void testPairEquals() throws Exception {
-	AvroType<Pair<Long, ByteBuffer>> at1 = Avros.pairs(Avros.longs(), Avros.bytes());
-	AvroType<Pair<Long, ByteBuffer>> at2 = Avros.pairs(Avros.longs(), Avros.bytes());
-	assertEquals(at1, at2);
-	assertEquals(at1.hashCode(), at2.hashCode());
+    AvroType<Pair<Long, ByteBuffer>> at1 = Avros.pairs(Avros.longs(), Avros.bytes());
+    AvroType<Pair<Long, ByteBuffer>> at2 = Avros.pairs(Avros.longs(), Avros.bytes());
+    assertEquals(at1, at2);
+    assertEquals(at1.hashCode(), at2.hashCode());
   }
-  
+
   @Test
   @SuppressWarnings("rawtypes")
   public void testTriples() throws Exception {
@@ -144,7 +150,7 @@ public class AvrosTest {
     w.put(2, new Utf8("c"));
     testInputOutputFn(at, j, w);
   }
-  
+
   @Test
   @SuppressWarnings("rawtypes")
   public void testQuads() throws Exception {
@@ -157,7 +163,7 @@ public class AvrosTest {
     w.put(3, new Utf8("d"));
     testInputOutputFn(at, j, w);
   }
-  
+
   @Test
   @SuppressWarnings("rawtypes")
   public void testTupleN() throws Exception {
@@ -171,9 +177,9 @@ public class AvrosTest {
     w.put(3, new Utf8("d"));
     w.put(4, new Utf8("e"));
     testInputOutputFn(at, j, w);
-    
+
   }
-   
+
   @Test
   @SuppressWarnings("rawtypes")
   public void testWritables() throws Exception {
@@ -181,7 +187,7 @@ public class AvrosTest {
     LongWritable lw = new LongWritable(1729L);
     assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw)));
   }
-  
+
   @Test
   @SuppressWarnings("rawtypes")
   public void testTableOf() throws Exception {
@@ -193,18 +199,18 @@ public class AvrosTest {
     // TODO update this after resolving the o.a.a.m.Pair.equals issue
     initialize(at);
     assertEquals(j, at.getInputMapFn().map(w));
-    org.apache.avro.mapred.Pair converted =
-        (org.apache.avro.mapred.Pair) at.getOutputMapFn().map(j);
+    org.apache.avro.mapred.Pair converted = (org.apache.avro.mapred.Pair) at.getOutputMapFn()
+        .map(j);
     assertEquals(w.key(), converted.key());
     assertEquals(w.value(), converted.value());
   }
-  
+
   private static void initialize(PType ptype) {
     ptype.getInputMapFn().initialize();
     ptype.getOutputMapFn().initialize();
   }
-  
-  @SuppressWarnings({"unchecked", "rawtypes"})
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   protected static void testInputOutputFn(PType ptype, Object java, Object avro) {
     initialize(ptype);
     assertEquals(java, ptype.getInputMapFn().map(avro));
@@ -221,4 +227,67 @@ public class AvrosTest {
     assertFalse(Avros.isPrimitive(Avros.reflects(Person.class)));
   }
 
+  @Test
+  public void testPairs_Generic() {
+    Schema schema = ReflectData.get().getSchema(IntWritable.class);
+
+    GenericData.Record recordA = new GenericData.Record(schema);
+    GenericData.Record recordB = new GenericData.Record(schema);
+
+    AvroType<Pair<Record, Record>> pairType = Avros.pairs(Avros.generics(schema),
+        Avros.generics(schema));
+    Pair<Record, Record> pair = Pair.of(recordA, recordB);
+    pairType.getOutputMapFn().initialize();
+    pairType.getInputMapFn().initialize();
+    Object mapped = pairType.getOutputMapFn().map(pair);
+    Pair<Record, Record> doubleMappedPair = pairType.getInputMapFn().map(mapped);
+
+    assertEquals(pair, doubleMappedPair);
+    mapped.hashCode();
+  }
+
+  @Test
+  public void testPairs_Reflect() {
+    IntWritable intWritableA = new IntWritable(1);
+    IntWritable intWritableB = new IntWritable(2);
+
+    AvroType<Pair<IntWritable, IntWritable>> pairType = Avros.pairs(
+        Avros.reflects(IntWritable.class), Avros.reflects(IntWritable.class));
+    Pair<IntWritable, IntWritable> pair = Pair.of(intWritableA, intWritableB);
+    pairType.getOutputMapFn().initialize();
+    pairType.getInputMapFn().initialize();
+    Object mapped = pairType.getOutputMapFn().map(pair);
+
+    Pair<IntWritable, IntWritable> doubleMappedPair = pairType.getInputMapFn().map(mapped);
+
+    assertEquals(pair, doubleMappedPair);
+  }
+
+  @Test
+  public void testPairs_Specific() {
+    Person personA = new Person();
+    Person personB = new Person();
+
+    personA.setAge(1);
+    personA.setName("A");
+    personA.setSiblingnames(Collections.<CharSequence> emptyList());
+
+    personB.setAge(2);
+    personB.setName("B");
+    personB.setSiblingnames(Collections.<CharSequence> emptyList());
+
+    AvroType<Pair<Person, Person>> pairType = Avros.pairs(Avros.records(Person.class),
+        Avros.records(Person.class));
+
+    Pair<Person, Person> pair = Pair.of(personA, personB);
+    pairType.getOutputMapFn().initialize();
+    pairType.getInputMapFn().initialize();
+
+    Object mapped = pairType.getOutputMapFn().map(pair);
+    Pair<Person, Person> doubleMappedPair = pairType.getInputMapFn().map(mapped);
+
+    assertEquals(pair, doubleMappedPair);
+
+  }
+
 }


Mime
View raw message