crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-385: Cleanup Scrunch for Scala 2.10
Date Sat, 17 May 2014 21:37:55 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 d67e899f5 -> 2e2a7ddd2


CRUNCH-385: Cleanup Scrunch for Scala 2.10


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2e2a7ddd
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2e2a7ddd
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2e2a7ddd

Branch: refs/heads/apache-crunch-0.8
Commit: 2e2a7ddd2e6a4569d171067a1497fd372b1e1fe7
Parents: d67e899
Author: Josh Wills <jwills@apache.org>
Authored: Thu May 15 17:55:09 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sat May 17 14:30:42 2014 -0700

----------------------------------------------------------------------
 .../org/apache/crunch/scrunch/CogroupTest.scala |  2 --
 .../org/apache/crunch/scrunch/CrunchSuite.scala |  2 +-
 .../apache/crunch/scrunch/DeepCopyTest.scala    |  3 +--
 .../org/apache/crunch/scrunch/JoinTest.scala    |  2 --
 .../org/apache/crunch/scrunch/PTableTest.scala  |  2 --
 .../crunch/scrunch/PageRankClassTest.scala      |  2 --
 .../apache/crunch/scrunch/PageRankTest.scala    |  2 --
 .../apache/crunch/scrunch/PipelineAppTest.scala |  3 ---
 .../org/apache/crunch/scrunch/TopTest.scala     |  2 --
 .../org/apache/crunch/scrunch/UnionTest.scala   |  2 --
 .../apache/crunch/scrunch/WordCountTest.scala   |  2 --
 .../org/apache/crunch/scrunch/Conversions.scala | 14 +++++++++----
 .../crunch/scrunch/EmbeddedPipeline.scala       |  3 ++-
 .../scala/org/apache/crunch/scrunch/Mem.scala   |  2 +-
 .../apache/crunch/scrunch/PCollectionLike.scala |  2 ++
 .../org/apache/crunch/scrunch/PTable.scala      |  3 +--
 .../org/apache/crunch/scrunch/PTypeFamily.scala | 22 ++++++++++++++------
 .../org/apache/crunch/scrunch/Pipeline.scala    | 13 ++++++------
 18 files changed, 41 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala
index 40df05b..c7e53ae 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala
@@ -18,9 +18,7 @@
 package org.apache.crunch.scrunch
 
 import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
 
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Test
 
 class CogroupTest extends CrunchSuite {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala
index 37a4fe2..ab9cd0d 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.crunch.scrunch
 
 import org.scalatest.junit.JUnitSuite
-import org.junit.{After, Before, Rule}
+import org.junit.{After, Before}
 import org.apache.crunch.test.TemporaryPath
 
 class CrunchSuite extends JUnitSuite{

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
index f30965a..c931754 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala
@@ -17,8 +17,7 @@
  */
 package org.apache.crunch.scrunch
 
-import org.apache.crunch.impl.mr.MRPipeline
-import org.apache.crunch.io.{From => from, To => to}
+import org.apache.crunch.io.{From => from}
 import org.apache.crunch.types.avro.{Avros => A}
 import org.apache.avro.file.DataFileWriter
 import org.apache.hadoop.fs.{Path, FSDataOutputStream}

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
index bd640eb..57f974d 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala
@@ -18,9 +18,7 @@
 package org.apache.crunch.scrunch
 
 import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
 
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Test
 
 class JoinTest extends CrunchSuite {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
index 35b2a8d..bae1b4e 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala
@@ -18,11 +18,9 @@
 package org.apache.crunch.scrunch
 
 import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
 
 import _root_.org.junit.Assert._
 import _root_.org.junit.Test
-import org.scalatest.junit.JUnitSuite
 
 /**
  * Tests functionality of Scala PTable.

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
index 55e7783..585c7aa 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala
@@ -21,11 +21,9 @@ import Avros._
 
 import org.apache.crunch.{DoFn, Emitter, Pair => P}
 import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
 
 import scala.collection.mutable.HashMap
 
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Assert._
 import _root_.org.junit.Test
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
index 833fce8..dbd14aa 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala
@@ -21,11 +21,9 @@ import Avros._
 
 import org.apache.crunch.{DoFn, Emitter, Pair => P}
 import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
 
 import scala.collection.mutable.HashMap
 
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Assert._
 import _root_.org.junit.Test
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
index fffd529..c566e59 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala
@@ -17,9 +17,6 @@
  */
 package org.apache.crunch.scrunch
 
-import org.apache.crunch.test.CrunchTestSupport
-
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Test
 
 object WordCount extends PipelineApp {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala
index ca728e8..186ec27 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala
@@ -18,9 +18,7 @@
 package org.apache.crunch.scrunch
 
 import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
 
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Test
 
 class TopTest extends CrunchSuite {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala
index a507e37..f62cef3 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala
@@ -18,9 +18,7 @@
 package org.apache.crunch.scrunch
 
 import org.apache.crunch.io.{From => from}
-import org.apache.crunch.test.CrunchTestSupport
 
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Test
 
 class UnionTest extends CrunchSuite {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala
index 2cc1457..7ee4de0 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala
@@ -18,9 +18,7 @@
 package org.apache.crunch.scrunch
 
 import org.apache.crunch.io.{From => from, To => to}
-import org.apache.crunch.test.CrunchTestSupport
 
-import org.scalatest.junit.JUnitSuite
 import _root_.org.junit.Test
 
 class WordCountTest extends CrunchSuite {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
index 7928fc8..744685f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala
@@ -22,6 +22,8 @@ import org.apache.crunch.{Pair => CPair}
 import org.apache.crunch.types.PType
 import java.nio.ByteBuffer
 import scala.collection.Iterable
+import scala.reflect.ClassTag
+import org.apache.hadoop.io.Writable
 
 trait CanParallelTransform[El, To] {
   def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]):
To
@@ -73,6 +75,14 @@ object PTypeH {
   implicit val booleans = new PTypeH[Boolean] { def get(ptf: PTypeFamily) = ptf.booleans
}
   implicit val bytes = new PTypeH[ByteBuffer] { def get(ptf: PTypeFamily) = ptf.bytes }
 
+  implicit def writables[W <: Writable : ClassTag] = new PTypeH[W] {
+    def get(ptf: PTypeFamily): PType[W] = ptf.writables(implicitly[ClassTag[W]])
+  }
+
+  implicit def records[T <: AnyRef : ClassTag] = new PTypeH[T] {
+    def get(ptf: PTypeFamily) = ptf.records(implicitly[ClassTag[T]]).asInstanceOf[PType[T]]
+  }
+
   implicit def collections[T: PTypeH] = {
     new PTypeH[Iterable[T]] {
       def get(ptf: PTypeFamily) = {
@@ -130,10 +140,6 @@ object PTypeH {
       }
     }
   }
-
-  implicit def records[T <: AnyRef : ClassManifest] = new PTypeH[T] {
-    def get(ptf: PTypeFamily) = ptf.records(classManifest[T]).asInstanceOf[PType[T]]
-  }
 }
 
 object Conversions {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
index f3be7ba..e9df263 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala
@@ -18,6 +18,7 @@
 package org.apache.crunch.scrunch
 
 import org.apache.hadoop.conf.Configuration
+import scala.reflect.ClassTag
 
 /**
  * Adds a pipeline to the class it is being mixed in to.
@@ -32,7 +33,7 @@ trait EmbeddedPipeline {
  */
 trait MREmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike {
   protected val pipeline: Pipeline = {
-    Pipeline.mapReduce(ClassManifest.fromClass(getClass()).erasure, new Configuration())
+    Pipeline.mapReduce(getClass(), new Configuration())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
index 0e863a3..58d646f 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala
@@ -37,7 +37,7 @@ object Mem extends MemEmbeddedPipeline with PipelineHelper {
   /**
    * Constructs a PCollection using in memory data.
    *
-   * @param collect The data to load.
+   * @param ts The data to load.
    * @return A PCollection containing the specified data.
    */
   def collectionOf[T](ts: T*)(implicit pt: PTypeH[T]): PCollection[T] = {

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index 68fe7a5..c0162df 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -20,6 +20,8 @@ package org.apache.crunch.scrunch
 import org.apache.crunch.DoFn
 import org.apache.crunch.{PCollection => JCollection, Pair => JPair, Target}
 import org.apache.crunch.types.{PType, PTableType}
+import org.apache.crunch.types.writable.WritableTypeFamily
+import scala.reflect.ClassTag
 
 trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
   val native: NativeType

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 7d5bd66..646aded 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -24,9 +24,8 @@ import scala.collection.JavaConversions._
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
 import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
 import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables}
-import org.apache.crunch.lib.join.{JoinStrategy, DefaultJoinStrategy, JoinType}
+import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType}
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
-import java.util
 
 class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K,
V], JTable[K, V]] {
   import PTable._

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
index 636b4e2..9a30d58 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala
@@ -19,11 +19,13 @@ package org.apache.crunch.scrunch
 
 import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn}
 import org.apache.crunch.types.{PType, PTypeFamily => PTF}
-import org.apache.crunch.types.writable.WritableTypeFamily
+import org.apache.crunch.types.writable.{WritableTypeFamily, Writables => CWritables}
 import org.apache.crunch.types.avro.{AvroType, AvroTypeFamily, Avros => CAvros}
 import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float =>
JFloat, Boolean => JBoolean}
 import java.util.{Collection => JCollection}
 import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
+import org.apache.hadoop.io.Writable
 
 class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean =
false) extends MapFn[S, T] {
   override def initialize() {
@@ -40,11 +42,15 @@ trait PTypeFamily {
 
   def ptf: PTF
 
+  def writables[T <: Writable : ClassTag]: PType[T]
+
+  def as[T](ptype: PType[T]) = ptf.as(ptype)
+
   val strings = ptf.strings()
 
   val bytes = ptf.bytes()
 
-  def records[T: ClassManifest] = ptf.records(classManifest[T].erasure)
+  def records[T: ClassTag] = ptf.records(implicitly[ClassTag[T]].runtimeClass)
 
   def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S])
= {
     ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt)
@@ -80,6 +86,8 @@ trait PTypeFamily {
     derived(classOf[Boolean], in, out, ptf.booleans())
   }
 
+  def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType)
+
   def collections[T](ptype: PType[T]) = {
     derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype))
   }
@@ -118,18 +126,20 @@ trait PTypeFamily {
     val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4)
     derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4))
   }
-
-  def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType)
 }
 
 object Writables extends PTypeFamily {
   override def ptf = WritableTypeFamily.getInstance()
+
+  override def writables[T <: Writable : ClassTag] = CWritables.writables(
+    implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
 }
 
 object Avros extends PTypeFamily {
   override def ptf = AvroTypeFamily.getInstance()
 
-  CAvros.REFLECT_DATA_FACTORY = new ScalaReflectDataFactory()
+  override def writables[T <: Writable : ClassTag] = CAvros.writables(
+    implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
 
-  def reflects[T: ClassManifest]() = CAvros.reflects(classManifest[T].erasure).asInstanceOf[AvroType[T]]
+  def reflects[T: ClassTag]() = CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]]
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
index ebf3165..d250e9b 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala
@@ -27,6 +27,7 @@ import org.apache.crunch.impl.mem.MemPipeline
 import org.apache.crunch.impl.mr.MRPipeline
 import org.apache.crunch.util.DistCache
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
+import scala.reflect.ClassTag
 
 /**
  * Manages the state of a pipeline execution.
@@ -131,7 +132,7 @@ object Pipeline {
    *
    * @tparam T Type of the class using the pipeline.
    */
-  def mapReduce[T : ClassManifest]: MapReducePipeline = mapReduce[T](new Configuration())
+  def mapReduce[T : ClassTag]: MapReducePipeline = mapReduce[T](new Configuration())
 
   /**
    * Creates a pipeline for running jobs on a hadoop cluster.
@@ -139,8 +140,8 @@ object Pipeline {
    * @param configuration Hadoop configuration to use.
    * @tparam T Type of the class using the pipeline.
    */
-  def mapReduce[T : ClassManifest](configuration: Configuration): MapReducePipeline = {
-    new MapReducePipeline(implicitly[ClassManifest[T]].erasure, configuration)
+  def mapReduce[T : ClassTag](configuration: Configuration): MapReducePipeline = {
+    new MapReducePipeline(implicitly[ClassTag[T]].runtimeClass, configuration)
   }
 
   /**
@@ -153,12 +154,12 @@ object Pipeline {
    *
    * @param configuration Configuration for connecting to a Hadoop cluster.
    * @param memory Option specifying whether or not the pipeline is an in memory or mapreduce
pipeline.
-   * @param manifest ClassManifest for the class using the pipeline.
+   * @param ctag ClassTag for the class using the pipeline.
    * @tparam T type of the class using the pipeline.
    * @deprecated Use either {{{Pipeline.mapReduce(class, conf)}}} or {{{Pipeline.inMemory}}}
    */
   def apply[T](
     configuration: Configuration = new Configuration(),
-    memory: Boolean = false)(implicit manifest: ClassManifest[T]
-  ): Pipeline = if (memory) inMemory else mapReduce(manifest.erasure, configuration)
+    memory: Boolean = false)(implicit ctag: ClassTag[T]
+  ): Pipeline = if (memory) inMemory else mapReduce(ctag.runtimeClass, configuration)
 }


Mime
View raw message