Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B34B311CE0 for ; Sat, 17 May 2014 22:20:23 +0000 (UTC) Received: (qmail 78033 invoked by uid 500); 17 May 2014 22:20:17 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 50615 invoked by uid 500); 17 May 2014 21:55:17 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 43473 invoked by uid 99); 17 May 2014 21:37:56 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 17 May 2014 21:37:56 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E816692E624; Sat, 17 May 2014 21:37:55 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: <447140d5759b462fa99998dd22a364be@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-385: Cleanup Scrunch for Scala 2.10 Date: Sat, 17 May 2014 21:37:55 +0000 (UTC) Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 d67e899f5 -> 2e2a7ddd2 CRUNCH-385: Cleanup Scrunch for Scala 2.10 Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2e2a7ddd Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2e2a7ddd Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2e2a7ddd Branch: refs/heads/apache-crunch-0.8 Commit: 2e2a7ddd2e6a4569d171067a1497fd372b1e1fe7 Parents: d67e899 Author: Josh Wills Authored: Thu May 15 17:55:09 2014 -0700 Committer: Josh Wills Committed: Sat May 17 14:30:42 2014 -0700 ---------------------------------------------------------------------- .../org/apache/crunch/scrunch/CogroupTest.scala | 2 -- .../org/apache/crunch/scrunch/CrunchSuite.scala | 2 +- .../apache/crunch/scrunch/DeepCopyTest.scala | 3 +-- .../org/apache/crunch/scrunch/JoinTest.scala | 2 -- .../org/apache/crunch/scrunch/PTableTest.scala | 2 -- .../crunch/scrunch/PageRankClassTest.scala | 2 -- .../apache/crunch/scrunch/PageRankTest.scala | 2 -- .../apache/crunch/scrunch/PipelineAppTest.scala | 3 --- .../org/apache/crunch/scrunch/TopTest.scala | 2 -- .../org/apache/crunch/scrunch/UnionTest.scala | 2 -- .../apache/crunch/scrunch/WordCountTest.scala | 2 -- .../org/apache/crunch/scrunch/Conversions.scala | 14 +++++++++---- .../crunch/scrunch/EmbeddedPipeline.scala | 3 ++- .../scala/org/apache/crunch/scrunch/Mem.scala | 2 +- .../apache/crunch/scrunch/PCollectionLike.scala | 2 ++ .../org/apache/crunch/scrunch/PTable.scala | 3 +-- .../org/apache/crunch/scrunch/PTypeFamily.scala | 22 ++++++++++++++------ .../org/apache/crunch/scrunch/Pipeline.scala | 13 ++++++------ 18 files changed, 41 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala index 40df05b..c7e53ae 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CogroupTest.scala @@ -18,9 +18,7 @@ package org.apache.crunch.scrunch import org.apache.crunch.io.{From => from} -import org.apache.crunch.test.CrunchTestSupport -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Test class CogroupTest extends CrunchSuite { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala index 37a4fe2..ab9cd0d 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/CrunchSuite.scala @@ -18,7 +18,7 @@ package org.apache.crunch.scrunch import org.scalatest.junit.JUnitSuite -import org.junit.{After, Before, Rule} +import org.junit.{After, Before} import org.apache.crunch.test.TemporaryPath class CrunchSuite extends JUnitSuite{ http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala index f30965a..c931754 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/DeepCopyTest.scala @@ -17,8 +17,7 @@ */ package org.apache.crunch.scrunch -import org.apache.crunch.impl.mr.MRPipeline -import org.apache.crunch.io.{From => from, To => to} +import org.apache.crunch.io.{From => from} import org.apache.crunch.types.avro.{Avros => A} import org.apache.avro.file.DataFileWriter import org.apache.hadoop.fs.{Path, FSDataOutputStream} http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala index bd640eb..57f974d 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/JoinTest.scala @@ -18,9 +18,7 @@ package org.apache.crunch.scrunch import org.apache.crunch.io.{From => from, To => to} -import org.apache.crunch.test.CrunchTestSupport -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Test class JoinTest extends CrunchSuite { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala index 35b2a8d..bae1b4e 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PTableTest.scala @@ -18,11 +18,9 @@ package org.apache.crunch.scrunch import org.apache.crunch.io.{From => from, To => to} -import org.apache.crunch.test.CrunchTestSupport import _root_.org.junit.Assert._ import _root_.org.junit.Test -import org.scalatest.junit.JUnitSuite /** * Tests functionality of Scala PTable. http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala index 55e7783..585c7aa 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankClassTest.scala @@ -21,11 +21,9 @@ import Avros._ import org.apache.crunch.{DoFn, Emitter, Pair => P} import org.apache.crunch.io.{From => from} -import org.apache.crunch.test.CrunchTestSupport import scala.collection.mutable.HashMap -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Assert._ import _root_.org.junit.Test http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala index 833fce8..dbd14aa 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PageRankTest.scala @@ -21,11 +21,9 @@ import Avros._ import org.apache.crunch.{DoFn, Emitter, Pair => P} import org.apache.crunch.io.{From => from} -import org.apache.crunch.test.CrunchTestSupport import scala.collection.mutable.HashMap -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Assert._ import _root_.org.junit.Test http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala index fffd529..c566e59 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/PipelineAppTest.scala @@ -17,9 +17,6 @@ */ package org.apache.crunch.scrunch -import org.apache.crunch.test.CrunchTestSupport - -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Test object WordCount extends PipelineApp { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala index ca728e8..186ec27 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/TopTest.scala @@ -18,9 +18,7 @@ package org.apache.crunch.scrunch import org.apache.crunch.io.{From => from, To => to} -import org.apache.crunch.test.CrunchTestSupport -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Test class TopTest extends CrunchSuite { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala index a507e37..f62cef3 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/UnionTest.scala @@ -18,9 +18,7 @@ package org.apache.crunch.scrunch import org.apache.crunch.io.{From => from} -import org.apache.crunch.test.CrunchTestSupport -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Test class UnionTest extends CrunchSuite { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala index 2cc1457..7ee4de0 100644 --- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala +++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/WordCountTest.scala @@ -18,9 +18,7 @@ package org.apache.crunch.scrunch import org.apache.crunch.io.{From => from, To => to} -import org.apache.crunch.test.CrunchTestSupport -import org.scalatest.junit.JUnitSuite import _root_.org.junit.Test class WordCountTest extends CrunchSuite { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala index 7928fc8..744685f 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Conversions.scala @@ -22,6 +22,8 @@ import org.apache.crunch.{Pair => CPair} import org.apache.crunch.types.PType import java.nio.ByteBuffer import scala.collection.Iterable +import scala.reflect.ClassTag +import org.apache.hadoop.io.Writable trait CanParallelTransform[El, To] { def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]): To @@ -73,6 +75,14 @@ object PTypeH { implicit val booleans = new PTypeH[Boolean] { def get(ptf: PTypeFamily) = ptf.booleans } implicit val bytes = new PTypeH[ByteBuffer] { def get(ptf: PTypeFamily) = ptf.bytes } + implicit def writables[W <: Writable : ClassTag] = new PTypeH[W] { + def get(ptf: PTypeFamily): PType[W] = ptf.writables(implicitly[ClassTag[W]]) + } + + implicit def records[T <: AnyRef : ClassTag] = new PTypeH[T] { + def get(ptf: PTypeFamily) = ptf.records(implicitly[ClassTag[T]]).asInstanceOf[PType[T]] + } + implicit def collections[T: PTypeH] = { new PTypeH[Iterable[T]] { def get(ptf: PTypeFamily) = { @@ -130,10 +140,6 @@ object PTypeH { } } } - - implicit def records[T <: AnyRef : ClassManifest] = new PTypeH[T] { - def get(ptf: PTypeFamily) = ptf.records(classManifest[T]).asInstanceOf[PType[T]] - } } object Conversions { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala index f3be7ba..e9df263 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/EmbeddedPipeline.scala @@ -18,6 +18,7 @@ package org.apache.crunch.scrunch import org.apache.hadoop.conf.Configuration +import scala.reflect.ClassTag /** * Adds a pipeline to the class it is being mixed in to. @@ -32,7 +33,7 @@ trait EmbeddedPipeline { */ trait MREmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike { protected val pipeline: Pipeline = { - Pipeline.mapReduce(ClassManifest.fromClass(getClass()).erasure, new Configuration()) + Pipeline.mapReduce(getClass(), new Configuration()) } } http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala index 0e863a3..58d646f 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Mem.scala @@ -37,7 +37,7 @@ object Mem extends MemEmbeddedPipeline with PipelineHelper { /** * Constructs a PCollection using in memory data. * - * @param collect The data to load. + * @param ts The data to load. * @return A PCollection containing the specified data. */ def collectionOf[T](ts: T*)(implicit pt: PTypeH[T]): PCollection[T] = { http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala index 68fe7a5..c0162df 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala @@ -20,6 +20,8 @@ package org.apache.crunch.scrunch import org.apache.crunch.DoFn import org.apache.crunch.{PCollection => JCollection, Pair => JPair, Target} import org.apache.crunch.types.{PType, PTableType} +import org.apache.crunch.types.writable.WritableTypeFamily +import scala.reflect.ClassTag trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] { val native: NativeType http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala index 7d5bd66..646aded 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala @@ -24,9 +24,8 @@ import scala.collection.JavaConversions._ import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn} import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair} import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables} -import org.apache.crunch.lib.join.{JoinStrategy, DefaultJoinStrategy, JoinType} +import org.apache.crunch.lib.join.{DefaultJoinStrategy, JoinType} import org.apache.crunch.scrunch.interpreter.InterpreterRunner -import java.util class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] { import PTable._ http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala index 636b4e2..9a30d58 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTypeFamily.scala @@ -19,11 +19,13 @@ package org.apache.crunch.scrunch import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn} import org.apache.crunch.types.{PType, PTypeFamily => PTF} -import org.apache.crunch.types.writable.WritableTypeFamily +import org.apache.crunch.types.writable.{WritableTypeFamily, Writables => CWritables} import org.apache.crunch.types.avro.{AvroType, AvroTypeFamily, Avros => CAvros} import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JFloat, Boolean => JBoolean} import java.util.{Collection => JCollection} import scala.collection.JavaConversions._ +import scala.reflect.ClassTag +import org.apache.hadoop.io.Writable class TMapFn[S, T](val f: S => T, val pt: Option[PType[S]] = None, var init: Boolean = false) extends MapFn[S, T] { override def initialize() { @@ -40,11 +42,15 @@ trait PTypeFamily { def ptf: PTF + def writables[T <: Writable : ClassTag]: PType[T] + + def as[T](ptype: PType[T]) = ptf.as(ptype) + val strings = ptf.strings() val bytes = ptf.bytes() - def records[T: ClassManifest] = ptf.records(classManifest[T].erasure) + def records[T: ClassTag] = ptf.records(implicitly[ClassTag[T]].runtimeClass) def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = { ptf.derived(cls, new TMapFn[S, T](in, Some(pt)), new TMapFn[T, S](out), pt) @@ -80,6 +86,8 @@ trait PTypeFamily { derived(classOf[Boolean], in, out, ptf.booleans()) } + def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType) + def collections[T](ptype: PType[T]) = { derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype)) } @@ -118,18 +126,20 @@ trait PTypeFamily { val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4) derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4)) } - - def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType) } object Writables extends PTypeFamily { override def ptf = WritableTypeFamily.getInstance() + + override def writables[T <: Writable : ClassTag] = CWritables.writables( + implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) } object Avros extends PTypeFamily { override def ptf = AvroTypeFamily.getInstance() - CAvros.REFLECT_DATA_FACTORY = new ScalaReflectDataFactory() + override def writables[T <: Writable : ClassTag] = CAvros.writables( + implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]]) - def reflects[T: ClassManifest]() = CAvros.reflects(classManifest[T].erasure).asInstanceOf[AvroType[T]] + def reflects[T: ClassTag]() = CAvros.reflects(implicitly[ClassTag[T]].runtimeClass).asInstanceOf[AvroType[T]] } http://git-wip-us.apache.org/repos/asf/crunch/blob/2e2a7ddd/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala ---------------------------------------------------------------------- diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala index ebf3165..d250e9b 100644 --- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala +++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Pipeline.scala @@ -27,6 +27,7 @@ import org.apache.crunch.impl.mem.MemPipeline import org.apache.crunch.impl.mr.MRPipeline import org.apache.crunch.util.DistCache import org.apache.crunch.scrunch.interpreter.InterpreterRunner +import scala.reflect.ClassTag /** * Manages the state of a pipeline execution. @@ -131,7 +132,7 @@ object Pipeline { * * @tparam T Type of the class using the pipeline. */ - def mapReduce[T : ClassManifest]: MapReducePipeline = mapReduce[T](new Configuration()) + def mapReduce[T : ClassTag]: MapReducePipeline = mapReduce[T](new Configuration()) /** * Creates a pipeline for running jobs on a hadoop cluster. @@ -139,8 +140,8 @@ object Pipeline { * @param configuration Hadoop configuration to use. * @tparam T Type of the class using the pipeline. */ - def mapReduce[T : ClassManifest](configuration: Configuration): MapReducePipeline = { - new MapReducePipeline(implicitly[ClassManifest[T]].erasure, configuration) + def mapReduce[T : ClassTag](configuration: Configuration): MapReducePipeline = { + new MapReducePipeline(implicitly[ClassTag[T]].runtimeClass, configuration) } /** @@ -153,12 +154,12 @@ object Pipeline { * * @param configuration Configuration for connecting to a Hadoop cluster. * @param memory Option specifying whether or not the pipeline is an in memory or mapreduce pipeline. - * @param manifest ClassManifest for the class using the pipeline. + * @param ctag ClassTag for the class using the pipeline. * @tparam T type of the class using the pipeline. * @deprecated Use either {{{Pipeline.mapReduce(class, conf)}}} or {{{Pipeline.inMemory}}} */ def apply[T]( configuration: Configuration = new Configuration(), - memory: Boolean = false)(implicit manifest: ClassManifest[T] - ): Pipeline = if (memory) inMemory else mapReduce(manifest.erasure, configuration) + memory: Boolean = false)(implicit ctag: ClassTag[T] + ): Pipeline = if (memory) inMemory else mapReduce(ctag.runtimeClass, configuration) }