crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-448: Support for serializing arbitrary case classes, optionally as fully-specified Avro records.
Date Wed, 30 Jul 2014 04:28:07 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 3f13ee65c -> 71d59b691


CRUNCH-448: Support for serializing arbitrary case classes, optionally as
fully-specified Avro records.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/71d59b69
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/71d59b69
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/71d59b69

Branch: refs/heads/master
Commit: 71d59b6910747d98eecbde5492c04db1226a8062
Parents: 3f13ee6
Author: Josh Wills <jwills@apache.org>
Authored: Fri Jul 11 15:59:39 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Jul 29 21:17:33 2014 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/TupleN.java |   2 +
 .../org/apache/crunch/types/avro/Avros.java     |  41 ++++-
 .../crunch/scrunch/PageRankClassTest.scala      |  10 +-
 .../apache/crunch/scrunch/ScalaTypesTest.scala  |  22 +++
 .../org/apache/crunch/scrunch/Conversions.scala |  23 ++-
 .../org/apache/crunch/scrunch/PTypeFamily.scala | 158 +++++++++++++++++--
 .../org/apache/crunch/scrunch/TupleNTest.scala  |  24 +++
 7 files changed, 260 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/71d59b69/crunch-core/src/main/java/org/apache/crunch/TupleN.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/TupleN.java b/crunch-core/src/main/java/org/apache/crunch/TupleN.java
index aaf988a..884b0b6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/TupleN.java
+++ b/crunch-core/src/main/java/org/apache/crunch/TupleN.java
@@ -37,6 +37,8 @@ public class TupleN implements Tuple {
     System.arraycopy(values, 0, this.values, 0, values.length);
   }
 
+  public Object[] getValues() { return values; }
+
   @Override
   public Object get(int index) {
     return values[index];

http://git-wip-us.apache.org/repos/asf/crunch/blob/71d59b69/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
index f950145..d6065f9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -646,6 +647,14 @@ public class Avros {
         new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes),
null, ptypes);
   }
 
+  public static final AvroType<TupleN> namedTuples(String tupleName, String[] fieldNames,
PType[] ptypes) {
+    Preconditions.checkArgument(fieldNames.length == ptypes.length,
+        "Number of field names must match number of ptypes");
+    Schema schema = createTupleSchema(tupleName, fieldNames, ptypes);
+    return new AvroType(TupleN.class, schema, new GenericRecordToTuple(TupleFactory.TUPLEN,
ptypes),
+        new TupleToGenericRecord(schema, ptypes), new TupleDeepCopier(TupleN.class, ptypes),
null, ptypes);
+  }
+
   public static <T extends Tuple> AvroType<T> tuples(Class<T> clazz, PType...
ptypes) {
     Schema schema = createTupleSchema(ptypes);
     Class[] typeArgs = new Class[ptypes.length];
@@ -799,7 +808,19 @@ public class Avros {
         new TupleToUnionRecord(schema, ptypes), new UnionDeepCopier(ptypes), null, ptypes);
   }
 
+  private static String[] fieldNames(int len) {
+    String[] ret = new String[len];
+    for (int i = 0; i < ret.length; i++) {
+      ret[i]= "v" + i;
+    }
+    return ret;
+  }
+
   private static Schema createTupleSchema(PType<?>... ptypes) throws RuntimeException
{
+    return createTupleSchema("", fieldNames(ptypes.length), ptypes);
+  }
+
+  private static Schema createTupleSchema(String tupleName, String[] fieldNames, PType<?>[]
ptypes) throws RuntimeException {
     // Guarantee each tuple schema has a globally unique name
     List<Schema.Field> fields = Lists.newArrayList();
     MessageDigest md;
@@ -811,11 +832,25 @@ public class Avros {
     for (int i = 0; i < ptypes.length; i++) {
       AvroType atype = (AvroType) ptypes[i];
       Schema fieldSchema = allowNulls(atype.getSchema());
-      fields.add(new Schema.Field("v" + i, fieldSchema, "", null));
+      fields.add(new Schema.Field(fieldNames[i], fieldSchema, "", null));
+      md.update(fieldNames[i].getBytes(Charsets.UTF_8));
       md.update(fieldSchema.toString().getBytes(Charsets.UTF_8));
     }
-    String schemaName = "tuple" + Base64.encodeBase64URLSafeString(md.digest()).replace('-',
'x');
-    Schema schema = Schema.createRecord(schemaName, "", "crunch", false);
+    String schemaName, schemaNamespace;
+    if (tupleName.isEmpty()) {
+      schemaName = "tuple" + Base64.encodeBase64URLSafeString(md.digest()).replace('-', 'x');
+      schemaNamespace = "crunch";
+    } else {
+      int splitIndex = tupleName.lastIndexOf('.');
+      if (splitIndex == -1) {
+        schemaName = tupleName;
+        schemaNamespace = "crunch";
+      } else {
+        schemaName = tupleName.substring(splitIndex + 1);
+        schemaNamespace = tupleName.substring(0, splitIndex);
+      }
+    }
+    Schema schema = Schema.createRecord(schemaName, "", schemaNamespace, false);
     schema.setFields(fields);
     return schema;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/71d59b69/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
index f7ccf1a..1b9cd26 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
@@ -27,8 +27,10 @@ import scala.collection.mutable.HashMap
 import _root_.org.junit.Assert._
 import _root_.org.junit.Test
 
-case class PageRankData(page_rank: Float, oldpr: Float, urls: Array[String], bytes: Array[Byte])
{
-  def this() = this(0f, 0f, null, Array[Byte](0))
+class PageRankData(val page_rank: Float, oldpr: Float, val urls: Array[String], bytes: Array[Byte])
{
+
+  // Required no-arg constructor for Avro reflection
+  def this() = this(0.0f, 0.0f, null, null)
 
   def scaledPageRank = page_rank / urls.length
 
@@ -67,7 +69,7 @@ class PageRankClassTest extends CrunchSuite {
     pipeline.read(from.textFile(fileName, Avros.strings))
       .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
       .groupByKey
-      .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray,
Array[Byte](0))))
+      .map((url, links) => (url, new PageRankData(1f, 0f, links.filter(x => x != null).toArray,
Array[Byte](0))))
   }
 
   def update(prev: PTable[String, PageRankData], d: Float) = {
@@ -102,7 +104,7 @@ class PageRankClassTest extends CrunchSuite {
     pipeline.done
   }
 
-  def testFastPageRank {
+  @Test def testFastPageRank {
     var prev = initialInput(tempDir.copyResourceFileName("urls.txt"))
     var delta = 1.0f
     while (delta > 0.01f) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/71d59b69/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
index e4dc771..4382ca9 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
@@ -29,6 +29,8 @@ object ScalaTypesTest {
   }
 }
 
+case class MyCaseClass(name: List[String], gender: Option[String], age: Int, birthday: Long)
+
 class ScalaTypesTest extends CrunchSuite {
   import ScalaTypesTest._
 
@@ -58,4 +60,24 @@ class ScalaTypesTest extends CrunchSuite {
     assert(out.exists(_.isLeft))
     assert(out.exists(_.isRight))
   }
+
+  @Test
+  def product {
+    val pt = Avros.caseClasses[MyCaseClass]
+    pt.getInputMapFn.initialize()
+    pt.getOutputMapFn.initialize()
+    val cc = MyCaseClass(List("Josh", "Wills"), Some("Male"), 35, 1234L)
+    val ser = pt.getOutputMapFn.map(cc)
+    assert(cc == pt.getInputMapFn.map(ser))
+  }
+
+  @Test
+  def productWithNulls {
+    val pt = Avros.caseClasses[MyCaseClass]
+    pt.getInputMapFn.initialize()
+    pt.getOutputMapFn.initialize()
+    val cc = MyCaseClass(List("Josh", "Wills"), null, 35, 1234L)
+    val ser = pt.getOutputMapFn.map(cc)
+    assert(cc == pt.getInputMapFn.map(ser))
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/71d59b69/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
index ffd85c6..4dec8cf 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
@@ -23,6 +23,7 @@ import org.apache.crunch.types.{PTypes, PType}
 import java.nio.ByteBuffer
 import scala.collection.Iterable
 import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.TypeTag
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce.TaskInputOutputContext
 import com.google.protobuf.Message
@@ -86,12 +87,18 @@ trait PTypeH[T] extends Serializable {
   def get(ptf: PTypeFamily): PType[T]
 }
 
-trait LowPriorityPTypeH {
+trait VeryLowPriorityPTypeH {
   implicit def records[T <: AnyRef : ClassTag] = new PTypeH[T] {
     def get(ptf: PTypeFamily) = ptf.records(implicitly[ClassTag[T]]).asInstanceOf[PType[T]]
   }
 }
 
+trait LowPriorityPTypeH extends VeryLowPriorityPTypeH {
+  implicit def caseClasses[T <: Product: TypeTag] = new PTypeH[T] {
+    override def get(ptf: PTypeFamily): PType[T] = ptf.caseClasses[T]
+  }
+}
+
 object PTypeH extends GeneratedTupleConversions with LowPriorityPTypeH {
 
   implicit val longs = new PTypeH[Long] { def get(ptf: PTypeFamily) = ptf.longs }
@@ -143,6 +150,14 @@ object PTypeH extends GeneratedTupleConversions with LowPriorityPTypeH
{
     }
   }
 
+  implicit def arrays[T: PTypeH] = {
+    new PTypeH[Array[T]] {
+      def get(ptf: PTypeFamily) = {
+        ptf.arrays[T](implicitly[PTypeH[T]].get(ptf))
+      }
+    }
+  }
+
   implicit def collections[T: PTypeH] = {
     new PTypeH[Iterable[T]] {
       def get(ptf: PTypeFamily) = {
@@ -167,10 +182,10 @@ object PTypeH extends GeneratedTupleConversions with LowPriorityPTypeH
{
     }
   }
 
-  implicit def maps[T: PTypeH] = {
-    new PTypeH[Map[String, T]] {
+  implicit def maps[K: PTypeH, V: PTypeH] = {
+    new PTypeH[Map[K, V]] {
       def get(ptf: PTypeFamily) = {
-        ptf.maps(implicitly[PTypeH[T]].get(ptf))
+        ptf.maps(implicitly[PTypeH[K]].get(ptf), implicitly[PTypeH[V]].get(ptf))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/71d59b69/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index f77067a..1157a34 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -22,11 +22,16 @@ import org.apache.crunch.types.{PType, PTypeFamily => PTF}
 import org.apache.crunch.types.writable.{WritableTypeFamily, Writables => CWritables}
 import org.apache.crunch.types.avro.{AvroType, AvroTypeFamily, Avros => CAvros}
 import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float =>
JFloat, Boolean => JBoolean}
+import java.lang.reflect.{Array => RArray}
 import java.util.{Collection => JCollection}
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
+import scala.reflect.runtime.universe._
+import scala.reflect.runtime.currentMirror
 import org.apache.hadoop.io.Writable
 import org.apache.avro.specific.SpecificRecord
+import java.nio.ByteBuffer
+import com.google.common.collect.Lists
 
 class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean =
false) extends MapFn[S, T] {
   override def initialize() {
@@ -53,6 +58,18 @@ object GeneratedTupleHelper {
   }
 }
 
+class TypeMapFn(val rc: Class[_], @transient var ctor: java.lang.reflect.Constructor[_] =
null)
+  extends MapFn[TupleN, Product] {
+
+  override def initialize {
+    this.ctor = rc.getConstructors().apply(0)
+  }
+
+  override def map(x: TupleN): Product = {
+    ctor.newInstance(x.getValues : _*).asInstanceOf[Product]
+  }
+}
+
 trait BasePTypeFamily {
   def ptf: PTF
 
@@ -63,7 +80,11 @@ trait BasePTypeFamily {
 
 trait PTypeFamily extends GeneratedTuplePTypeFamily {
 
-  def writables[T <: Writable : ClassTag]: PType[T]
+  def writables[T <: Writable](clazz: Class[T]): PType[T]
+
+  def writables[T <: Writable : ClassTag]: PType[T] = {
+    writables[T](implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
+  }
 
   def as[T](ptype: PType[T]) = ptf.as(ptype)
 
@@ -71,7 +92,9 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily {
 
   val bytes = ptf.bytes()
 
-  def records[T: ClassTag] = ptf.records(implicitly[ClassTag[T]].runtimeClass)
+  def records[T: ClassTag]: PType[T] = records(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
+
+  def records[T](clazz: Class[T]): PType[T] = ptf.records(clazz)
 
   def derivedImmutable[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt:
PType[S]) = {
     ptf.derivedImmutable(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
@@ -141,9 +164,38 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily {
     derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype))
   }
 
-  def maps[T](ptype: PType[T]) = {
-    derived(classOf[Map[String, T]], {x: java.util.Map[String, T] => mapAsScalaMap(x).toMap},

-        mapAsJavaMap[String, T], ptf.maps(ptype))
+  def maps[T](ptype: PType[T]): PType[Map[String, T]] = maps(strings, ptype)
+
+  def maps[K, V](keyType: PType[K], valueType: PType[V]): PType[Map[K, V]] = {
+    if (classOf[String].equals(keyType.getTypeClass)) {
+      derived(classOf[Map[String, V]],
+        { x: java.util.Map[String, V] => mapAsScalaMap(x).toMap},
+        mapAsJavaMap[String, V],
+        ptf.maps(valueType)).asInstanceOf[PType[Map[K, V]]]
+    } else {
+      derived(classOf[Map[K, V]],
+        {x: JCollection[CPair[K, V]] => Map[K, V](x.map(y => (y.first(), y.second())).toArray
: _*)},
+        {x: Map[K, V] => asJavaCollection(x.toIterable.map(y => CPair.of(y._1, y._2)))},
+        ptf.collections(ptf.pairs(keyType, valueType)))
+    }
+  }
+
+  def arrays[T](ptype: PType[T]): PType[Array[T]] = {
+    val in = (x: JCollection[_]) => {
+      val ret = RArray.newInstance(ptype.getTypeClass, x.size())
+      var i = 0
+      val iter = x.iterator()
+      while (iter.hasNext) {
+        RArray.set(ret, i, iter.next())
+        i += 1
+      }
+      ret.asInstanceOf[Array[T]]
+    }
+    val out = (x: Array[T]) => Lists.newArrayList(x: _*).asInstanceOf[JCollection[_]]
+    derived(classOf[Array[T]],
+      in, out,
+      ptf.collections(ptype).asInstanceOf[PType[JCollection[_]]])
+      .asInstanceOf[PType[Array[T]]]
   }
 
   def lists[T](ptype: PType[T]) = {
@@ -175,20 +227,104 @@ trait PTypeFamily extends GeneratedTuplePTypeFamily {
     val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4)
     derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4))
   }
+
+  def namedTuples(tupleName: String, fields: List[(String, PType[_])]): PType[TupleN]
+
+  def caseClasses[T <: Product : TypeTag]: PType[T] = products[T](implicitly[TypeTag[T]].tpe)
+
+  private def products[T <: Product](tpe: Type): PType[T] = {
+    val ctor = tpe.member(nme.CONSTRUCTOR).asMethod
+    val args = ctor.paramss.head.map(x => (x.name.toString, typeToPType(x.typeSignature)))
+    val out = (x: Product) => TupleN.of(x.productIterator.toArray.asInstanceOf[Array[Object]]
: _*)
+    val rtc = currentMirror.runtimeClass(tpe)
+    val base = namedTuples(rtc.getCanonicalName, args)
+    ptf.derivedImmutable(classOf[Product], new TypeMapFn(rtc), new TMapFn[Product, TupleN](out),
base)
+      .asInstanceOf[PType[T]]
+  }
+
+  private val classToPrimitivePType = Map(
+    classOf[Int] -> ints,
+    classOf[java.lang.Integer] -> jints,
+    classOf[Long] -> longs,
+    classOf[java.lang.Long] -> jlongs,
+    classOf[Boolean] -> booleans,
+    classOf[java.lang.Boolean] -> jbooleans,
+    classOf[Double] -> doubles,
+    classOf[java.lang.Double] -> jdoubles,
+    classOf[Float] -> floats,
+    classOf[java.lang.Float] -> jfloats,
+    classOf[String] -> strings,
+    classOf[ByteBuffer] -> bytes
+  )
+
+  private val typeToPTypeCache: collection.mutable.Map[Type, PType[_]] = new collection.mutable.HashMap()
+
+  private def encache[T](tpe: Type, pt: PType[_]) = {
+    typeToPTypeCache.put(tpe, pt)
+    pt.asInstanceOf[PType[T]]
+  }
+
+  private def typeToPType[T](tpe: Type): PType[T] = {
+    val cpt = typeToPTypeCache.get(tpe)
+    if (cpt.isDefined) {
+      return cpt.get.asInstanceOf[PType[T]]
+    }
+
+    val rtc = currentMirror.runtimeClass(tpe)
+    val ret = classToPrimitivePType.get(rtc)
+    if (ret != null) {
+      return ret.asInstanceOf[PType[T]]
+    } else if (classOf[Writable].isAssignableFrom(rtc)) {
+      return writables(rtc.asInstanceOf[Class[Writable]]).asInstanceOf[PType[T]]
+    } else if (tpe.typeSymbol.asClass.isCaseClass) {
+      return encache(tpe, products(tpe))
+    } else {
+      val targs = if (tpe.isInstanceOf[TypeRefApi]) {
+        tpe.asInstanceOf[TypeRefApi].args
+      } else {
+        List()
+      }
+
+      if (targs.isEmpty) {
+        return encache(tpe, records(rtc))
+      } else if (targs.size == 1) {
+        if (rtc.isArray) {
+          return encache(tpe, arrays(typeToPType(targs(0))))
+        } else if (classOf[List[_]].isAssignableFrom(rtc)) {
+          return encache(tpe, lists(typeToPType(targs(0))))
+        } else if (classOf[Set[_]].isAssignableFrom(rtc)) {
+          return encache(tpe, sets(typeToPType(targs(0))))
+        } else if (classOf[Option[_]].isAssignableFrom(rtc)) {
+          return encache(tpe, options(typeToPType(targs(0))))
+        } else if (classOf[Iterable[_]].isAssignableFrom(rtc)) {
+          return encache(tpe, collections(typeToPType(targs(0))))
+        }
+      } else if (targs.size == 2) {
+        if (classOf[Either[_, _]].isAssignableFrom(rtc)) {
+          return encache(tpe, eithers(typeToPType(targs(0)), typeToPType(targs(1))))
+        } else if (classOf[Map[_, _]].isAssignableFrom(rtc)) {
+          return encache(tpe, maps(typeToPType(targs(0)), typeToPType(targs(1))))
+        }
+      }
+    }
+    throw new IllegalArgumentException("Could not handle class type = " + tpe)
+  }
 }
 
 object Writables extends PTypeFamily {
   override def ptf = WritableTypeFamily.getInstance()
 
-  override def writables[T <: Writable : ClassTag] = CWritables.writables(
-    implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
+  override def writables[T <: Writable](clazz: Class[T]) = CWritables.writables(clazz)
+
+  override def namedTuples(tupleName: String, fields: List[(String, PType[_])]) = {
+    ptf.tuples(fields.map(_._2).toArray :_*)
+  }
 }
 
 object Avros extends PTypeFamily {
   override def ptf = AvroTypeFamily.getInstance()
 
-  override def writables[T <: Writable : ClassTag] = CAvros.writables(
-    implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
+  override def writables[T <: Writable](clazz: Class[T]) = CAvros.writables(clazz)
 
   override def records[T: ClassTag] = reflects()(implicitly[ClassTag[T]])
 
@@ -201,4 +337,8 @@ object Avros extends PTypeFamily {
     val schema = ScalaSafeReflectData.getInstance().getSchema(clazz)
     CAvros.reflects(clazz, schema)
   }
+
+  override def namedTuples(tupleName: String, fields: List[(String, PType[_])]) = {
+    CAvros.namedTuples(tupleName, fields.map(_._1).toArray, fields.map(_._2).toArray)
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/71d59b69/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/TupleNTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/TupleNTest.scala b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/TupleNTest.scala
index 640a72f..0810aa3 100644
--- a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/TupleNTest.scala
+++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/TupleNTest.scala
@@ -23,10 +23,34 @@ package org.apache.crunch.scrunch
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 
+/** Case classes for testing purposes */
+case class One(a: Int, b: String, c: List[java.lang.Long], d: Array[Long])
+case class Two(a: One, b: Set[Option[Boolean]], c: Map[String, Double], d: Map[Int, String])
+case class Three(a: List[One], b: Array[Either[One, Two]])
+
 class TupleNTest extends JUnitSuite{
   @Test def testTupleN {
     val pc = Mem.collectionOf((1, 2, "a", 3, "b"), (4, 5, "a", 6, "c"))
     val res = pc.map(x => (x._3, x._4)).groupByKey.combineValues(Aggregators.sum[Int]).materialize
     org.junit.Assert.assertEquals(List(("a", 9)), res.toList)
   }
+
+  /**
+   * Basically, we just want to validate that we can generate schemas for these classes successfully
+   */
+  val ones = Array(One(1, "a", List(17L, 29L), Array(12L, 13L)), One(2, "b", List(0L), Array(17L,
29L)))
+  val twos = Array(Two(ones(0), Set(Some(true), None), Map("a" -> 1.2, "b" -> 2.9),
Map(1 -> "a", 2 -> "b")))
+  val threes = Array(Three(ones.toList, Array(Left(ones(0)), Right(twos(0)))))
+
+  @Test def onesTest {
+    val pc = Mem.collectionOf(ones : _*)
+  }
+
+  @Test def twosTest {
+    val pc = Mem.collectionOf(twos : _*)
+  }
+
+  @Test def threesTest {
+    val pc = Mem.collectionOf(threes : _*)
+  }
 }


Mime
View raw message