incubator-crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject [26/28] Rename scrunch packages and add license headers
Date Sat, 07 Jul 2012 21:49:08 GMT
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/com/cloudera/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/com/cloudera/scrunch/Pipeline.scala b/scrunch/src/main/scala/com/cloudera/scrunch/Pipeline.scala
deleted file mode 100644
index 208dcc9..0000000
--- a/scrunch/src/main/scala/com/cloudera/scrunch/Pipeline.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import java.lang.Class
-
-import org.apache.hadoop.conf.Configuration
-
-import com.cloudera.crunch.{Pipeline => JPipeline}
-import com.cloudera.crunch.impl.mem.MemPipeline
-import com.cloudera.crunch.impl.mr.MRPipeline
-
-/**
- * Manages the state of a pipeline execution.
- *
- * ==Overview==
- * There are two subtypes of [[com.cloudera.crunch.Pipeline]]:
- * [[com.cloudera.crunch.Pipeline#MapReduce]] - for jobs run on a Hadoop cluster.
- * [[com.cloudera.crunch.Pipeline#InMemory]] - for jobs run in memory.
- *
- * To create a Hadoop pipeline:
- * {{{
- * import com.cloudera.scrunch.Pipeline
- *
- * Pipeline.mapreduce[MyClass]
- * }}}
- *
- * To get an in memory pipeline:
- * {{{
- * import com.cloudera.scrunch.Pipeline
- *
- * Pipeline.inMemory
- * }}}
- */
-class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
-  /**
-   * A convenience method for reading a text file.
-   *
-   * @param pathName Path to desired text file.
-   * @return A PCollection containing the lines in the specified file.
-   */
-  def readTextFile(pathName: String): PCollection[String] = {
-    new PCollection[String](jpipeline.readTextFile(pathName))
-  }
-
-  /**
-   * A convenience method for writing a text file.
-   *
-   * @param pcollect A PCollection to write to text.
-   * @param pathName Path to desired output text file.
-   */
-  def writeTextFile[T](pcollect: PCollection[T], pathName: String) {
-    jpipeline.writeTextFile(pcollect.native, pathName)
-  }
-}
-
-/**
- * Companion object. Contains subclasses of Pipeline.
- */
-object Pipeline {
-  /**
-   * Pipeline for running jobs on a hadoop cluster.
-   *
-   * @param clazz Type of the class using the pipeline.
-   * @param configuration Hadoop configuration to use.
-   */
-  class MapReducePipeline (clazz: Class[_], configuration: Configuration)
-    extends Pipeline(new MRPipeline(clazz, configuration))
-
-  /**
-   * Pipeline for running jobs in memory.
-   */
-  object InMemoryPipeline extends Pipeline(MemPipeline.getInstance())
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
-   *
-   * @param clazz Type of the class using the pipeline.
-   */
-  def mapReduce(clazz: Class[_]): MapReducePipeline = mapReduce(clazz, new Configuration())
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster.
-   *
-   * @param clazz Type of the class using the pipeline.
-   * @param configuration Hadoop configuration to use.
-   */
-  def mapReduce(clazz: Class[_], configuration: Configuration): MapReducePipeline = {
-    new MapReducePipeline(clazz, configuration)
-  }
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
-   *
-   * @tparam T Type of the class using the pipeline.
-   */
-  def mapReduce[T : ClassManifest]: MapReducePipeline = mapReduce[T](new Configuration())
-
-  /**
-   * Creates a pipeline for running jobs on a hadoop cluster.
-   *
-   * @param configuration Hadoop configuration to use.
-   * @tparam T Type of the class using the pipeline.
-   */
-  def mapReduce[T : ClassManifest](configuration: Configuration): MapReducePipeline = {
-    new MapReducePipeline(implicitly[ClassManifest[T]].erasure, configuration)
-  }
-
-  /**
-   * Gets a pipeline for running jobs in memory.
-   */
-  def inMemory: InMemoryPipeline.type = InMemoryPipeline
-
-  /**
-   * Creates a new Pipeline according to the provided specifications.
-   *
-   * @param configuration Configuration for connecting to a Hadoop cluster.
-   * @param memory Option specifying whether or not the pipeline is an in memory or mapreduce pipeline.
-   * @param manifest ClassManifest for the class using the pipeline.
-   * @tparam T type of the class using the pipeline.
-   * @deprecated Use either {{{Pipeline.mapReduce(class, conf)}}} or {{{Pipeline.inMemory}}}
-   */
-  def apply[T](
-    configuration: Configuration = new Configuration(),
-    memory: Boolean = false)(implicit manifest: ClassManifest[T]
-  ): Pipeline = if (memory) inMemory else mapReduce(manifest.erasure, configuration)
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/com/cloudera/scrunch/PipelineApp.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/com/cloudera/scrunch/PipelineApp.scala b/scrunch/src/main/scala/com/cloudera/scrunch/PipelineApp.scala
deleted file mode 100644
index 218ecc2..0000000
--- a/scrunch/src/main/scala/com/cloudera/scrunch/PipelineApp.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import java.io.Serializable
-
-import scala.collection.mutable.ListBuffer
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.GenericOptionsParser
-
-import com.cloudera.crunch.{Source, TableSource, Target}
-
-trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedInit {
-  implicit def _string2path(str: String) = new Path(str)
-
-  /** Contains factory methods used to create `Source`s. */
-  val from = From
-
-  /** Contains factory methods used to create `Target`s. */
-  val to = To
-
-  /** Contains factory methods used to create `SourceTarget`s. */
-  val at = At
-
-  private val initCode = new ListBuffer[() => Unit]
-
-  private var _args: Array[String] = _
-
-  /** Command-line arguments passed to this application. */
-  protected def args: Array[String] = _args
-
-  def configuration: Configuration = pipeline.getConfiguration
-
-  /** Gets the distributed filesystem associated with this application's configuration. */
-  def fs: FileSystem = FileSystem.get(configuration)
-
-  override def delayedInit(body: => Unit) {
-    initCode += (() => body)
-  }
-
-  def main(args: Array[String]) = {
-    val parser = new GenericOptionsParser(configuration, args)
-    _args = parser.getRemainingArgs()
-    for (proc <- initCode) proc()
-    done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/com/cloudera/scrunch/PipelineHelper.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/com/cloudera/scrunch/PipelineHelper.scala b/scrunch/src/main/scala/com/cloudera/scrunch/PipelineHelper.scala
deleted file mode 100644
index 28e3c83..0000000
--- a/scrunch/src/main/scala/com/cloudera/scrunch/PipelineHelper.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-/**
- * This trait provides convenience methods for building pipelines.
- */
-trait PipelineHelper {
-  /**
-   * Materializes the specified PCollection and displays its contents.
-   */
-  def dump(data: PCollection[_]) {
-    data.materialize.foreach(println(_))
-  }
-
-  /**
-   * Materializes the specified PTable and displays its contents.
-   */
-  def dump(data: PTable[_, _]) {
-    data.materialize.foreach(println(_))
-  }
-
-  /**
-   * Performs a cogroup on the two specified PTables.
-   */
-  def cogroup[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
-      : PTable[K, (Iterable[V1], Iterable[V2])] = {
-    t1.cogroup(t2)
-  }
-
-  /**
-   * Performs an innerjoin on the two specified PTables.
-   */
-  def join[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
-      : PTable[K, (V1, V2)] = {
-    t1.join(t2)
-  }
-
-  /**
-   * Unions the specified PCollections.
-   */
-  def union[T](first: PCollection[T], others: PCollection[T]*)
-      : PCollection[T] = {
-    first.union(others: _*)
-  }
-
-  /**
-   * Unions the specified PTables.
-   */
-  def union[K, V](first: PTable[K, V], others: PTable[K, V]*)
-      : PTable[K, V] = {
-    first.union(others: _*)
-  }
-}
-
-/**
- * Companion object containing convenience methods for building pipelines.
- */
-object PipelineHelper extends PipelineHelper

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/com/cloudera/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/com/cloudera/scrunch/PipelineLike.scala b/scrunch/src/main/scala/com/cloudera/scrunch/PipelineLike.scala
deleted file mode 100644
index 99e1c4f..0000000
--- a/scrunch/src/main/scala/com/cloudera/scrunch/PipelineLike.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import org.apache.hadoop.conf.Configuration
-
-import com.cloudera.crunch.{Pipeline => JPipeline}
-import com.cloudera.crunch.Source
-import com.cloudera.crunch.TableSource
-import com.cloudera.crunch.Target
-
-trait PipelineLike {
-  def jpipeline: JPipeline
-
-  /**
-   * Gets the configuration object associated with this pipeline.
-   */
-  def getConfiguration(): Configuration = jpipeline.getConfiguration()
-
-  /**
-   * Reads a source into a [[com.cloudera.scrunch.PCollection]]
-   *
-   * @param source The source to read from.
-   * @tparam T The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source))
-
-  /**
-   * Reads a source into a [[com.cloudera.scrunch.PTable]]
-   *
-   * @param source The source to read from.
-   * @tparam K The type of the keys being read.
-   * @tparam V The type of the values being read.
-   * @return A PCollection containing data read from the specified source.
-   */
-  def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source))
-
-  /**
-   * Writes a parallel collection to a target.
-   *
-   * @param collection The collection to write.
-   * @param target The destination target for this write.
-   */
-  def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target)
-
-  /**
-   * Writes a parallel table to a target.
-   *
-   * @param table The table to write.
-   * @param target The destination target for this write.
-   */
-  def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target)
-
-  /**
-   * Constructs and executes a series of MapReduce jobs in order
-   * to write data to the output targets.
-   */
-  def run(): Unit = jpipeline.run()
-
-  /**
-   * Run any remaining jobs required to generate outputs and then
-   * clean up any intermediate data files that were created in
-   * this run or previous calls to `run`.
-   */
-  def done(): Unit = jpipeline.done()
-
-  /**
-   * Turn on debug logging for jobs that are run from this pipeline.
-   */
-  def debug(): Unit = jpipeline.enableDebug()
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala b/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
new file mode 100644
index 0000000..a704b80
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/Conversions.scala
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.{PCollection => JCollection, PGroupedTable => JGroupedTable, PTable => JTable, DoFn, Emitter}
+import org.apache.crunch.{Pair => CPair}
+import org.apache.crunch.types.PType
+import java.nio.ByteBuffer
+import scala.collection.Iterable
+
+trait CanParallelTransform[El, To] {
+  def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, El], ptype: PType[El]): To
+}
+
+trait LowPriorityParallelTransforms {
+  implicit def single[B] = new CanParallelTransform[B, PCollection[B]] {
+    def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, B], ptype: PType[B]) = {
+      c.parallelDo(fn, ptype)
+    }
+  }
+}
+
+object CanParallelTransform extends LowPriorityParallelTransforms {
+  def tableType[K, V](ptype: PType[(K, V)]) = {
+    val st = ptype.getSubTypes()
+    ptype.getFamily().tableOf(st.get(0).asInstanceOf[PType[K]], st.get(1).asInstanceOf[PType[V]])
+  }
+
+  implicit def keyvalue[K, V] = new CanParallelTransform[(K, V), PTable[K,V]] {
+    def apply[A](c: PCollectionLike[A, _, JCollection[A]], fn: DoFn[A, (K, V)], ptype: PType[(K, V)]) = {
+      c.parallelDo(kvWrapFn(fn), tableType(ptype))
+    }
+  }
+
+  def kvWrapFn[A, K, V](fn: DoFn[A, (K, V)]) = {
+    new DoFn[A, CPair[K, V]] {
+      override def process(input: A, emitFn: Emitter[CPair[K, V]]) {
+        fn.process(input, new Emitter[(K, V)] {
+          override def emit(kv: (K, V)) { emitFn.emit(CPair.of(kv._1, kv._2)) }
+          override def flush() { emitFn.flush() }
+        })
+      }
+    }
+  }
+} 
+
+trait PTypeH[T] {
+  def get(ptf: PTypeFamily): PType[T]
+}
+
+object PTypeH {
+
+  implicit val longs = new PTypeH[Long] { def get(ptf: PTypeFamily) = ptf.longs }
+  implicit val ints = new PTypeH[Int] { def get(ptf: PTypeFamily) = ptf.ints }
+  implicit val floats = new PTypeH[Float] { def get(ptf: PTypeFamily) = ptf.floats }
+  implicit val doubles = new PTypeH[Double] { def get(ptf: PTypeFamily) = ptf.doubles }
+  implicit val strings = new PTypeH[String] { def get(ptf: PTypeFamily) = ptf.strings }
+  implicit val booleans = new PTypeH[Boolean] { def get(ptf: PTypeFamily) = ptf.booleans }
+  implicit val bytes = new PTypeH[ByteBuffer] { def get(ptf: PTypeFamily) = ptf.bytes }
+
+  implicit def collections[T: PTypeH] = {
+    new PTypeH[Iterable[T]] {
+      def get(ptf: PTypeFamily) = {
+        ptf.collections(implicitly[PTypeH[T]].get(ptf))
+      }
+    }
+  }
+
+  implicit def lists[T: PTypeH] = {
+    new PTypeH[List[T]] {
+      def get(ptf: PTypeFamily) = {
+        ptf.lists(implicitly[PTypeH[T]].get(ptf))
+      }
+    }
+  }
+
+  implicit def sets[T: PTypeH] = {
+    new PTypeH[Set[T]] {
+      def get(ptf: PTypeFamily) = {
+        ptf.sets(implicitly[PTypeH[T]].get(ptf))
+      }
+    }
+  }
+
+  implicit def pairs[A: PTypeH, B: PTypeH] = {
+    new PTypeH[(A, B)] {
+      def get(ptf: PTypeFamily) = {
+        ptf.tuple2(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf))
+      }
+    }
+  }
+
+  implicit def trips[A: PTypeH, B: PTypeH, C: PTypeH] = {
+    new PTypeH[(A, B, C)] {
+      def get(ptf: PTypeFamily) = {
+        ptf.tuple3(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf),
+            implicitly[PTypeH[C]].get(ptf))
+      }
+    }
+  }
+
+  implicit def quads[A: PTypeH, B: PTypeH, C: PTypeH, D: PTypeH] = {
+    new PTypeH[(A, B, C, D)] {
+      def get(ptf: PTypeFamily) = {
+        ptf.tuple4(implicitly[PTypeH[A]].get(ptf), implicitly[PTypeH[B]].get(ptf),
+            implicitly[PTypeH[C]].get(ptf), implicitly[PTypeH[D]].get(ptf))
+      }
+    }
+  }
+
+  implicit def records[T <: AnyRef : ClassManifest] = new PTypeH[T] {
+    def get(ptf: PTypeFamily) = ptf.records(classManifest[T]).asInstanceOf[PType[T]]
+  }
+}
+
+object Conversions {
+  implicit def jtable2ptable[K, V](jtable: JTable[K, V]) = {
+    new PTable[K, V](jtable)
+  }
+  
+  implicit def jcollect2pcollect[S](jcollect: JCollection[S]) = {
+    new PCollection[S](jcollect)
+  }
+  
+  implicit def jgrouped2pgrouped[K, V](jgrouped: JGroupedTable[K, V]) = {
+    new PGroupedTable[K, V](jgrouped)
+  }
+
+  implicit def pair2tuple[K, V](p: CPair[K, V]) = (p.first(), p.second())
+
+  implicit def tuple2pair[K, V](t: (K, V)) = CPair.of(t._1, t._2)
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
new file mode 100644
index 0000000..8d69701
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipeline.scala
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.hadoop.conf.Configuration
+
+/**
+ * Adds a pipeline to the class it is being mixed in to.
+ */
+trait EmbeddedPipeline {
+  /** The pipeline to use. */
+  protected def pipeline: Pipeline
+}
+
+/**
+ * Adds a mapreduce pipeline to the class it is being mixed in to.
+ */
+trait MREmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike {
+  protected val pipeline: Pipeline = {
+    Pipeline.mapReduce(ClassManifest.fromClass(getClass()).erasure, new Configuration())
+  }
+}
+
+/**
+ * Adds an in memory pipeline to the class it is being mixed in to.
+ */
+trait MemEmbeddedPipeline extends EmbeddedPipeline with EmbeddedPipelineLike {
+  protected val pipeline: Pipeline = {
+    Pipeline.inMemory
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
new file mode 100644
index 0000000..0fbd0ea
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/EmbeddedPipelineLike.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.Source
+import org.apache.crunch.TableSource
+import org.apache.crunch.Target
+
+trait EmbeddedPipelineLike { self: EmbeddedPipeline =>
+  /**
+   * Reads a source into a [[org.apache.scrunch.PCollection]]
+   *
+   * @param source The source to read from.
+   * @tparam T The type of the values being read.
+   * @return A PCollection containing data read from the specified source.
+   */
+  def read[T](source: Source[T]): PCollection[T] = {
+    pipeline.read(source)
+  }
+
+  /**
+   * Reads a source into a [[org.apache.scrunch.PTable]]
+   *
+   * @param source The source to read from.
+   * @tparam K The type of the keys being read.
+   * @tparam V The type of the values being read.
+   * @return A PCollection containing data read from the specified source.
+   */
+  def read[K, V](source: TableSource[K, V]): PTable[K, V] = {
+    pipeline.read(source)
+  }
+
+  /**
+   * Reads a source into a [[org.apache.scrunch.PCollection]]
+   *
+   * @param source The source to read from.
+   * @tparam T The type of the values being read.
+   * @return A PCollection containing data read from the specified source.
+   */
+  def load[T](source: Source[T]): PCollection[T] = {
+    read(source)
+  }
+
+  /**
+   * Reads a source into a [[org.apache.scrunch.PTable]]
+   *
+   * @param source The source to read from.
+   * @tparam K The type of the keys being read.
+   * @tparam V The type of the values being read.
+   * @return A PCollection containing data read from the specified source.
+   */
+  def load[K, V](source: TableSource[K, V]): PTable[K, V] = {
+    read(source)
+  }
+
+  /**
+   * Writes a parallel collection to a target.
+   *
+   * @param collection The collection to write.
+   * @param target The destination target for this write.
+   */
+  def write(collection: PCollection[_], target: Target) {
+    pipeline.write(collection, target)
+  }
+
+  /**
+   * Writes a parallel table to a target.
+   *
+   * @param table The table to write.
+   * @param target The destination target for this write.
+   */
+  def write(table: PTable[_, _], target: Target) {
+    pipeline.write(table, target)
+  }
+
+  /**
+   * Writes a parallel collection to a target.
+   *
+   * @param collection The collection to write.
+   * @param target The destination target for this write.
+   */
+  def store(collection: PCollection[_], target: Target) {
+    write(collection, target)
+  }
+
+  /**
+   * Writes a parallel table to a target.
+   *
+   * @param table The table to write.
+   * @param target The destination target for this write.
+   */
+  def store(table: PTable[_, _], target: Target) {
+    write(table, target)
+  }
+
+  /**
+   * Constructs and executes a series of MapReduce jobs in order
+   * to write data to the output targets.
+   */
+  def run() {
+    pipeline.run
+  }
+
+  /**
+   * Run any remaining jobs required to generate outputs and then
+   * clean up any intermediate data files that were created in
+   * this run or previous calls to `run`.
+   */
+  def done() {
+    pipeline.done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/IO.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/IO.scala b/scrunch/src/main/scala/org/apache/scrunch/IO.scala
new file mode 100644
index 0000000..4886e6a
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/IO.scala
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from, To => to, At => at}
+import org.apache.crunch.types.avro.AvroType
+import org.apache.hadoop.fs.Path;
+
+object From {
+  def avroFile[T](path: String, atype: AvroType[T]) = from.avroFile(path, atype)
+  def avroFile[T](path: Path, atype: AvroType[T]) = from.avroFile(path, atype)
+  def textFile(path: String) = from.textFile(path)
+  def textFile(path: Path) = from.textFile(path)
+}
+
+object To {
+  def avroFile[T](path: String) = to.avroFile(path)
+  def avroFile[T](path: Path) = to.avroFile(path)
+  def textFile(path: String) = to.textFile(path)
+  def textFile(path: Path) = to.textFile(path)
+}
+
+object At {
+  def avroFile[T](path: String, atype: AvroType[T]) = at.avroFile(path, atype)
+  def avroFile[T](path: Path, atype: AvroType[T]) = at.avroFile(path, atype)
+  def textFile(path: String) = at.textFile(path)
+  def textFile(path: Path) = at.textFile(path)
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Mem.scala b/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
new file mode 100644
index 0000000..1c4f233
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/Mem.scala
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import java.lang.{Iterable => JIterable}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.crunch.{Pair => P}
+import org.apache.crunch.{Source, TableSource, Target}
+import org.apache.crunch.impl.mem.MemPipeline
+import org.apache.scrunch.Conversions._
+
+/**
+ * Object for working with in-memory PCollection and PTable instances.
+ */
+object Mem extends MemEmbeddedPipeline with PipelineHelper {
+  private val ptf = Avros
+
+  /**
+   * Constructs a PCollection using in memory data.
+   *
+   * @param collect The data to load.
+   * @return A PCollection containing the specified data.
+   */
+  def collectionOf[T](ts: T*)(implicit pt: PTypeH[T]): PCollection[T] = {
+    collectionOf(List(ts:_*))
+  }
+
+  /**
+   * Constructs a PCollection using in memory data.
+   *
+   * @param collect The data to load.
+   * @return A PCollection containing the specified data.
+   */
+  def collectionOf[T](collect: Iterable[T])(implicit pt: PTypeH[T]): PCollection[T] = {
+    val native = MemPipeline.typedCollectionOf(pt.get(ptf), asJavaIterable(collect))
+    new PCollection[T](native)
+  }
+
+  /**
+   * Constructs a PTable using in memory data.
+   *
+   * @param pairs The data to load.
+   * @return A PTable containing the specified data.
+   */
+  def tableOf[K, V](pairs: (K, V)*)(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
+    tableOf(List(pairs:_*))
+  }
+
+  /**
+   * Constructs a PTable using in memory data.
+   *
+   * @param pairs The data to load.
+   * @return A PTable containing the specified data.
+   */
+  def tableOf[K, V](pairs: Iterable[(K, V)])(implicit pk: PTypeH[K], pv: PTypeH[V]): PTable[K, V] = {
+    val cpairs = pairs.map(kv => P.of(kv._1, kv._2))
+    val ptype = ptf.tableOf(pk.get(ptf), pv.get(ptf))
+    new PTable[K, V](MemPipeline.typedTableOf(ptype, asJavaIterable(cpairs)))
+  }
+
+  /** Contains factory methods used to create `Source`s. */
+  val from = From
+
+  /** Contains factory methods used to create `Target`s. */
+  val to = To
+
+  /** Contains factory methods used to create `SourceTarget`s. */
+  val at = At
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
new file mode 100644
index 0000000..04a2e91
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PCollection.scala
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
+import org.apache.crunch.{PCollection => JCollection, PTable => JTable, Pair => CPair, Target}
+import org.apache.crunch.lib.Aggregate
+import org.apache.scrunch.Conversions._
+import scala.collection.JavaConversions
+
+class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCollection[S], JCollection[S]] {
+  import PCollection._
+
+  def filter(f: S => Boolean): PCollection[S] = {
+    parallelDo(filterFn[S](f), native.getPType())
+  }
+
+  def map[T, To](f: S => T)(implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
+    b(this, mapFn(f), pt.get(getTypeFamily()))
+  }
+
+  def flatMap[T, To](f: S => Traversable[T])
+      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
+    b(this, flatMapFn(f), pt.get(getTypeFamily()))
+  }
+
+  def union(others: PCollection[S]*) = {
+    new PCollection[S](native.union(others.map(_.native) : _*))
+  }
+
+  def by[K: PTypeH](f: S => K): PTable[K, S] = {
+    val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
+    parallelDo(mapKeyFn[S, K](f), ptype) 
+  }
+
+  def groupBy[K: PTypeH](f: S => K): PGroupedTable[K, S] = {
+    by(f).groupByKey
+  }
+  
+  def materialize() = JavaConversions.iterableAsScalaIterable[S](native.materialize)
+
+  def wrap(newNative: AnyRef) = new PCollection[S](newNative.asInstanceOf[JCollection[S]])
+  
+  def count() = {
+    val count = new PTable[S, java.lang.Long](Aggregate.count(native))
+    count.mapValues(_.longValue()) 
+  }
+
+  def max() = wrap(Aggregate.max(native))
+}
+
+trait SDoFn[S, T] extends DoFn[S, T] with Function1[S, Traversable[T]] {
+  override def process(input: S, emitter: Emitter[T]) {
+    for (v <- apply(input)) {
+      emitter.emit(v)
+    }
+  }
+}
+
+trait SFilterFn[T] extends FilterFn[T] with Function1[T, Boolean] {
+  override def accept(input: T) = apply(input)
+}
+
+trait SMapFn[S, T] extends MapFn[S, T] with Function1[S, T] {
+  override def map(input: S) = apply(input)
+}
+
+trait SMapKeyFn[S, K] extends MapFn[S, CPair[K, S]] with Function1[S, K] {
+  override def map(input: S): CPair[K, S] = {
+    CPair.of(apply(input), input)
+  }
+}
+
+object PCollection {
+  def filterFn[S](fn: S => Boolean) = {
+    new SFilterFn[S] { def apply(x: S) = fn(x) }
+  }
+
+  def mapKeyFn[S, K](fn: S => K) = {
+    new SMapKeyFn[S, K] { def apply(x: S) = fn(x) }
+  }
+
+  def mapFn[S, T](fn: S => T) = {
+    new SMapFn[S, T] { def apply(s: S) = fn(s) }
+  }
+
+  def flatMapFn[S, T](fn: S => Traversable[T]) = {
+    new SDoFn[S, T] { def apply(s: S) = fn(s) }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
new file mode 100644
index 0000000..e912e60
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PCollectionLike.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.DoFn
+import org.apache.crunch.{PCollection => JCollection, Pair => JPair, Target}
+import org.apache.crunch.types.{PType, PTableType}
+
+trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
+  val native: NativeType
+  
+  def wrap(newNative: AnyRef): FullType
+  
+  def write(target: Target): FullType = wrap(native.write(target))
+
+  def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
+    new PCollection[T](native.parallelDo(fn, ptype))
+  }
+
+  def parallelDo[T](name: String, fn: DoFn[S,T], ptype: PType[T]) = {
+    new PCollection[T](native.parallelDo(name, fn, ptype))
+  }
+
+  def parallelDo[K, V](fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
+    new PTable[K, V](native.parallelDo(fn, ptype))
+  }
+
+  def parallelDo[K, V](name: String, fn: DoFn[S, JPair[K, V]], ptype: PTableType[K, V]) = {
+    new PTable[K, V](native.parallelDo(name, fn, ptype))
+  }
+  
+  def getTypeFamily() = Avros
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
new file mode 100644
index 0000000..f4500a5
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PGroupedTable.scala
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
+import org.apache.crunch.{CombineFn, PGroupedTable => JGroupedTable, PTable => JTable, Pair => CPair}
+import java.lang.{Iterable => JIterable}
+import scala.collection.{Iterable, Iterator}
+import scala.collection.JavaConversions._
+import Conversions._
+
+class PGroupedTable[K, V](val native: JGroupedTable[K, V])
+    extends PCollectionLike[CPair[K, JIterable[V]], PGroupedTable[K, V], JGroupedTable[K, V]] {
+  import PGroupedTable._
+
+  def filter(f: (K, Iterable[V]) => Boolean) = {
+    parallelDo(filterFn[K, V](f), native.getPType())
+  }
+
+  def map[T, To](f: (K, Iterable[V]) => T)
+      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
+    b(this, mapFn(f), pt.get(getTypeFamily()))
+  }
+
+  def flatMap[T, To](f: (K, Iterable[V]) => Traversable[T])
+      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
+    b(this, flatMapFn(f), pt.get(getTypeFamily()))
+  }
+
+  def combine(f: Iterable[V] => V) = combineValues(new IterableCombineFn[K, V](f))
+
+  def combineValues(fn: CombineFn[K, V]) = new PTable[K, V](native.combineValues(fn))
+
+  def ungroup() = new PTable[K, V](native.ungroup())
+  
+  def wrap(newNative: AnyRef): PGroupedTable[K, V] = {
+    new PGroupedTable[K, V](newNative.asInstanceOf[JGroupedTable[K, V]])
+  }
+}
+
+class IterableCombineFn[K, V](f: Iterable[V] => V) extends CombineFn[K, V] {
+  override def process(input: CPair[K, JIterable[V]], emitfn: Emitter[CPair[K, V]]) = {
+    emitfn.emit(CPair.of(input.first(), f(iterableAsScalaIterable[V](input.second()))))
+  }
+}
+
+trait SFilterGroupedFn[K, V] extends FilterFn[CPair[K, JIterable[V]]] with Function2[K, Iterable[V], Boolean] {
+  override def accept(input: CPair[K, JIterable[V]]) = apply(input.first(), iterableAsScalaIterable[V](input.second()))
+}
+
+trait SDoGroupedFn[K, V, T] extends DoFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], Traversable[T]] {
+  override def process(input: CPair[K, JIterable[V]], emitter: Emitter[T]) {
+    for (v <- apply(input.first(), iterableAsScalaIterable[V](input.second()))) {
+      emitter.emit(v)
+    }
+  }
+}
+
+trait SMapGroupedFn[K, V, T] extends MapFn[CPair[K, JIterable[V]], T] with Function2[K, Iterable[V], T] {
+  override def map(input: CPair[K, JIterable[V]]) = {
+    apply(input.first(), iterableAsScalaIterable[V](input.second()))
+  }
+}
+
+object PGroupedTable {
+  def filterFn[K, V](fn: (K, Iterable[V]) => Boolean) = {
+    new SFilterGroupedFn[K, V] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  }
+
+  def mapFn[K, V, T](fn: (K, Iterable[V]) => T) = {
+    new SMapGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  }
+
+  def flatMapFn[K, V, T](fn: (K, Iterable[V]) => Traversable[T]) = {
+    new SDoGroupedFn[K, V, T] { def apply(k: K, v: Iterable[V]) = fn(k, v) }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTable.scala b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
new file mode 100644
index 0000000..5337929
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PTable.scala
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
+import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
+import org.apache.crunch.lib.{Join, Aggregate, Cogroup, PTables}
+import java.util.{Collection => JCollect}
+import scala.collection.JavaConversions._
+
+class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V], PTable[K, V], JTable[K, V]] {
+  import PTable._
+
+  def filter(f: (K, V) => Boolean): PTable[K, V] = {
+    parallelDo(filterFn[K, V](f), native.getPTableType())
+  }
+
+  def map[T, To](f: (K, V) => T)
+      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
+    b(this, mapFn(f), pt.get(getTypeFamily()))
+  }
+
+  def mapValues[T](f: V => T)(implicit pt: PTypeH[T]) = {
+    val ptf = getTypeFamily()
+    val ptype = ptf.tableOf(native.getKeyType(), pt.get(ptf))
+    parallelDo(mapValuesFn[K, V, T](f), ptype)
+  }
+
+  def flatMap[T, To](f: (K, V) => Traversable[T])
+      (implicit pt: PTypeH[T], b: CanParallelTransform[T, To]): To = {
+    b(this, flatMapFn(f), pt.get(getTypeFamily()))
+  }
+
+  def union(others: PTable[K, V]*) = {
+    new PTable[K, V](native.union(others.map(_.native) : _*))
+  }
+
+  def keys() = new PCollection[K](PTables.keys(native))
+
+  def values() = new PCollection[V](PTables.values(native))
+
+  def cogroup[V2](other: PTable[K, V2]) = {
+    val jres = Cogroup.cogroup[K, V, V2](this.native, other.native)
+    val ptf = getTypeFamily()
+    val inter = new PTable[K, CPair[JCollect[V], JCollect[V2]]](jres)
+    inter.parallelDo(new SMapTableValuesFn[K, CPair[JCollect[V], JCollect[V2]], (Iterable[V], Iterable[V2])] {
+      def apply(x: CPair[JCollect[V], JCollect[V2]]) = {
+        (collectionAsScalaIterable[V](x.first()), collectionAsScalaIterable[V2](x.second()))
+      }
+    }, ptf.tableOf(keyType, ptf.tuple2(ptf.collections(valueType), ptf.collections(other.valueType))))
+  }
+
+  type JoinFn[V2] = (JTable[K, V], JTable[K, V2]) => JTable[K, CPair[V, V2]]
+
+  protected def join[V2](joinFn: JoinFn[V2], other: PTable[K, V2]): PTable[K, (V, V2)] = {
+    val jres = joinFn(this.native, other.native)
+    val ptf = getTypeFamily()
+    val ptype = ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType))
+    val inter = new PTable[K, CPair[V, V2]](jres)
+    inter.parallelDo(new SMapTableValuesFn[K, CPair[V, V2], (V, V2)] {
+      def apply(x: CPair[V, V2]) = (x.first(), x.second())
+    }, ptype)
+  }
+
+  def join[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
+    innerJoin(other)
+  }
+
+  def innerJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
+    join[V2](Join.innerJoin[K, V, V2](_, _), other)
+  }
+
+  def leftJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
+    join[V2](Join.leftJoin[K, V, V2](_, _), other)
+  }
+
+  def rightJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
+    join[V2](Join.rightJoin[K, V, V2](_, _), other)
+  }
+
+  def fullJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
+    join[V2](Join.fullJoin[K, V, V2](_, _), other)
+  }
+
+  def top(limit: Int, maximize: Boolean) = {
+    wrap(Aggregate.top(this.native, limit, maximize))
+  }
+
+  def groupByKey() = new PGroupedTable(native.groupByKey())
+
+  def groupByKey(partitions: Int) = new PGroupedTable(native.groupByKey(partitions))
+
+  def groupByKey(options: GroupingOptions) = new PGroupedTable(native.groupByKey(options))
+
+  def wrap(newNative: AnyRef) = {
+    new PTable[K, V](newNative.asInstanceOf[JTable[K, V]])
+  }
+ 
+  def unwrap(sc: PTable[K, V]): JTable[K, V] = sc.native
+ 
+  def materialize(): Iterable[(K, V)] = {
+    native.materialize.view.map(x => (x.first, x.second))
+  }
+
+  def keyType() = native.getPTableType().getKeyType()
+
+  def valueType() = native.getPTableType().getValueType()
+}
+
+trait SFilterTableFn[K, V] extends FilterFn[CPair[K, V]] with Function2[K, V, Boolean] {
+  override def accept(input: CPair[K, V]) = apply(input.first(), input.second())
+}
+
+trait SDoTableFn[K, V, T] extends DoFn[CPair[K, V], T] with Function2[K, V, Traversable[T]] {
+  override def process(input: CPair[K, V], emitter: Emitter[T]) {
+    for (v <- apply(input.first(), input.second())) {
+      emitter.emit(v)
+    }
+  }
+}
+
+trait SMapTableFn[K, V, T] extends MapFn[CPair[K, V], T] with Function2[K, V, T] {
+  override def map(input: CPair[K, V]) = apply(input.first(), input.second())
+}
+
+trait SMapTableValuesFn[K, V, T] extends MapFn[CPair[K, V], CPair[K, T]] with Function1[V, T] {
+  override def map(input: CPair[K, V]) = CPair.of(input.first(), apply(input.second()))
+}
+
+object PTable {
+  def filterFn[K, V](fn: (K, V) => Boolean) = {
+    new SFilterTableFn[K, V] { def apply(k: K, v: V) = fn(k, v) }
+  }
+
+  def mapValuesFn[K, V, T](fn: V => T) = {
+    new SMapTableValuesFn[K, V, T] { def apply(v: V) = fn(v) }
+  }
+
+  def mapFn[K, V, T](fn: (K, V) => T) = {
+    new SMapTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
+  }
+
+  def flatMapFn[K, V, T](fn: (K, V) => Traversable[T]) = {
+    new SDoTableFn[K, V, T] { def apply(k: K, v: V) = fn(k, v) }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala b/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
new file mode 100644
index 0000000..1bd3db6
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PTypeFamily.scala
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.{Pair => CPair, Tuple3 => CTuple3, Tuple4 => CTuple4, MapFn}
+import org.apache.crunch.types.{PType, PTypeFamily => PTF}
+import org.apache.crunch.types.writable.WritableTypeFamily
+import org.apache.crunch.types.avro.{AvroTypeFamily, Avros => CAvros}
+import java.lang.{Long => JLong, Double => JDouble, Integer => JInt, Float => JFloat, Boolean => JBoolean}
+import java.util.{Collection => JCollection}
+import scala.collection.JavaConversions._
+
+class TMapFn[S, T](f: S => T) extends MapFn[S, T] {
+  override def map(input: S) = f(input)
+}
+
+trait PTypeFamily {
+
+  def ptf: PTF
+
+  val strings = ptf.strings()
+
+  val bytes = ptf.bytes()
+
+  def records[T: ClassManifest] = ptf.records(classManifest[T].erasure)
+
+  def derived[S, T](cls: java.lang.Class[T], in: S => T, out: T => S, pt: PType[S]) = {
+    ptf.derived(cls, new TMapFn[S, T](in), new TMapFn[T, S](out), pt)
+  }
+
+  val longs = {
+    val in = (x: JLong) => x.longValue()
+    val out = (x: Long) => new JLong(x)
+    derived(classOf[Long], in, out, ptf.longs())
+  }
+
+  val ints = {
+    val in = (x: JInt) => x.intValue()
+    val out = (x: Int) => new JInt(x)
+    derived(classOf[Int], in, out, ptf.ints())
+  }
+
+  val floats = {
+    val in = (x: JFloat) => x.floatValue()
+    val out = (x: Float) => new JFloat(x)
+    derived(classOf[Float], in, out, ptf.floats())
+  }
+
+  val doubles = {
+    val in = (x: JDouble) => x.doubleValue()
+    val out = (x: Double) => new JDouble(x)
+    derived(classOf[Double], in, out, ptf.doubles())
+  }
+
+  val booleans = {
+    val in = (x: JBoolean) => x.booleanValue()
+    val out = (x: Boolean) => new JBoolean(x)
+    derived(classOf[Boolean], in, out, ptf.booleans())
+  }
+
+  def collections[T](ptype: PType[T]) = {
+    derived(classOf[Iterable[T]], collectionAsScalaIterable[T], asJavaCollection[T], ptf.collections(ptype))
+  }
+
+  def maps[T](ptype: PType[T]) = {
+    derived(classOf[scala.collection.Map[String, T]], mapAsScalaMap[String, T], mapAsJavaMap[String, T], ptf.maps(ptype))
+  }
+
+  def lists[T](ptype: PType[T]) = {
+    val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toList
+    val out = (x: List[T]) => asJavaCollection[T](x)
+    derived(classOf[List[T]], in, out, ptf.collections(ptype))
+  }
+
+  def sets[T](ptype: PType[T]) = {
+    val in = (x: JCollection[T]) => collectionAsScalaIterable[T](x).toSet
+    val out = (x: Set[T]) => asJavaCollection[T](x)
+    derived(classOf[Set[T]], in, out, ptf.collections(ptype))
+  }
+
+  def tuple2[T1, T2](p1: PType[T1], p2: PType[T2]) = {
+    val in = (x: CPair[T1, T2]) => (x.first(), x.second())
+    val out = (x: (T1, T2)) => CPair.of(x._1, x._2)
+    derived(classOf[(T1, T2)], in, out, ptf.pairs(p1, p2))
+  }
+
+  def tuple3[T1, T2, T3](p1: PType[T1], p2: PType[T2], p3: PType[T3]) = {
+    val in = (x: CTuple3[T1, T2, T3]) => (x.first(), x.second(), x.third())
+    val out = (x: (T1, T2, T3)) => CTuple3.of(x._1, x._2, x._3)
+    derived(classOf[(T1, T2, T3)], in, out, ptf.triples(p1, p2, p3))
+  }
+
+  def tuple4[T1, T2, T3, T4](p1: PType[T1], p2: PType[T2], p3: PType[T3], p4: PType[T4]) = {
+    val in = (x: CTuple4[T1, T2, T3, T4]) => (x.first(), x.second(), x.third(), x.fourth())
+    val out = (x: (T1, T2, T3, T4)) => CTuple4.of(x._1, x._2, x._3, x._4)
+    derived(classOf[(T1, T2, T3, T4)], in, out, ptf.quads(p1, p2, p3, p4))
+  }
+
+  def tableOf[K, V](keyType: PType[K], valueType: PType[V]) = ptf.tableOf(keyType, valueType)
+}
+
+object Writables extends PTypeFamily {
+  override def ptf = WritableTypeFamily.getInstance()
+}
+
+object Avros extends PTypeFamily {
+  override def ptf = AvroTypeFamily.getInstance()
+
+  CAvros.REFLECT_DATA_FACTORY = new ScalaReflectDataFactory()
+
+  def reflects[T: ClassManifest]() = CAvros.reflects(classManifest[T].erasure)
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala b/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
new file mode 100644
index 0000000..a642afc
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/Pipeline.scala
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import java.lang.Class
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.crunch.{Pipeline => JPipeline}
+import org.apache.crunch.impl.mem.MemPipeline
+import org.apache.crunch.impl.mr.MRPipeline
+
+/**
+ * Manages the state of a pipeline execution.
+ *
+ * ==Overview==
+ * There are two subtypes of [[org.apache.crunch.Pipeline]]:
+ * [[org.apache.crunch.Pipeline#MapReduce]] - for jobs run on a Hadoop cluster.
+ * [[org.apache.crunch.Pipeline#InMemory]] - for jobs run in memory.
+ *
+ * To create a Hadoop pipeline:
+ * {{{
+ * import org.apache.scrunch.Pipeline
+ *
+ * Pipeline.mapreduce[MyClass]
+ * }}}
+ *
+ * To get an in memory pipeline:
+ * {{{
+ * import org.apache.scrunch.Pipeline
+ *
+ * Pipeline.inMemory
+ * }}}
+ */
+class Pipeline(val jpipeline: JPipeline) extends PipelineLike {
+  /**
+   * A convenience method for reading a text file.
+   *
+   * @param pathName Path to desired text file.
+   * @return A PCollection containing the lines in the specified file.
+   */
+  def readTextFile(pathName: String): PCollection[String] = {
+    new PCollection[String](jpipeline.readTextFile(pathName))
+  }
+
+  /**
+   * A convenience method for writing a text file.
+   *
+   * @param pcollect A PCollection to write to text.
+   * @param pathName Path to desired output text file.
+   */
+  def writeTextFile[T](pcollect: PCollection[T], pathName: String) {
+    jpipeline.writeTextFile(pcollect.native, pathName)
+  }
+}
+
+/**
+ * Companion object. Contains subclasses of Pipeline.
+ */
+object Pipeline {
+  /**
+   * Pipeline for running jobs on a hadoop cluster.
+   *
+   * @param clazz Type of the class using the pipeline.
+   * @param configuration Hadoop configuration to use.
+   */
+  class MapReducePipeline (clazz: Class[_], configuration: Configuration)
+    extends Pipeline(new MRPipeline(clazz, configuration))
+
+  /**
+   * Pipeline for running jobs in memory.
+   */
+  object InMemoryPipeline extends Pipeline(MemPipeline.getInstance())
+
+  /**
+   * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
+   *
+   * @param clazz Type of the class using the pipeline.
+   */
+  def mapReduce(clazz: Class[_]): MapReducePipeline = mapReduce(clazz, new Configuration())
+
+  /**
+   * Creates a pipeline for running jobs on a hadoop cluster.
+   *
+   * @param clazz Type of the class using the pipeline.
+   * @param configuration Hadoop configuration to use.
+   */
+  def mapReduce(clazz: Class[_], configuration: Configuration): MapReducePipeline = {
+    new MapReducePipeline(clazz, configuration)
+  }
+
+  /**
+   * Creates a pipeline for running jobs on a hadoop cluster using the default configuration.
+   *
+   * @tparam T Type of the class using the pipeline.
+   */
+  def mapReduce[T : ClassManifest]: MapReducePipeline = mapReduce[T](new Configuration())
+
+  /**
+   * Creates a pipeline for running jobs on a hadoop cluster.
+   *
+   * @param configuration Hadoop configuration to use.
+   * @tparam T Type of the class using the pipeline.
+   */
+  def mapReduce[T : ClassManifest](configuration: Configuration): MapReducePipeline = {
+    new MapReducePipeline(implicitly[ClassManifest[T]].erasure, configuration)
+  }
+
+  /**
+   * Gets a pipeline for running jobs in memory.
+   */
+  def inMemory: InMemoryPipeline.type = InMemoryPipeline
+
+  /**
+   * Creates a new Pipeline according to the provided specifications.
+   *
+   * @param configuration Configuration for connecting to a Hadoop cluster.
+   * @param memory Option specifying whether or not the pipeline is an in memory or mapreduce pipeline.
+   * @param manifest ClassManifest for the class using the pipeline.
+   * @tparam T type of the class using the pipeline.
+   * @deprecated Use either {{{Pipeline.mapReduce(class, conf)}}} or {{{Pipeline.inMemory}}}
+   */
+  def apply[T](
+    configuration: Configuration = new Configuration(),
+    memory: Boolean = false)(implicit manifest: ClassManifest[T]
+  ): Pipeline = if (memory) inMemory else mapReduce(manifest.erasure, configuration)
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
new file mode 100644
index 0000000..b427f20
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PipelineApp.scala
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import java.io.Serializable
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.util.GenericOptionsParser
+
+import org.apache.crunch.{Source, TableSource, Target}
+
+trait PipelineApp extends MREmbeddedPipeline with PipelineHelper with DelayedInit {
+  implicit def _string2path(str: String) = new Path(str)
+
+  /** Contains factory methods used to create `Source`s. */
+  val from = From
+
+  /** Contains factory methods used to create `Target`s. */
+  val to = To
+
+  /** Contains factory methods used to create `SourceTarget`s. */
+  val at = At
+
+  private val initCode = new ListBuffer[() => Unit]
+
+  private var _args: Array[String] = _
+
+  /** Command-line arguments passed to this application. */
+  protected def args: Array[String] = _args
+
+  def configuration: Configuration = pipeline.getConfiguration
+
+  /** Gets the distributed filesystem associated with this application's configuration. */
+  def fs: FileSystem = FileSystem.get(configuration)
+
+  override def delayedInit(body: => Unit) {
+    initCode += (() => body)
+  }
+
+  def main(args: Array[String]) = {
+    val parser = new GenericOptionsParser(configuration, args)
+    _args = parser.getRemainingArgs()
+    for (proc <- initCode) proc()
+    done
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
new file mode 100644
index 0000000..cdeb37b
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PipelineHelper.scala
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+/**
+ * This trait provides convenience methods for building pipelines.
+ */
+trait PipelineHelper {
+  /**
+   * Materializes the specified PCollection and displays its contents.
+   */
+  def dump(data: PCollection[_]) {
+    data.materialize.foreach(println(_))
+  }
+
+  /**
+   * Materializes the specified PTable and displays its contents.
+   */
+  def dump(data: PTable[_, _]) {
+    data.materialize.foreach(println(_))
+  }
+
+  /**
+   * Performs a cogroup on the two specified PTables.
+   */
+  def cogroup[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
+      : PTable[K, (Iterable[V1], Iterable[V2])] = {
+    t1.cogroup(t2)
+  }
+
+  /**
+   * Performs an innerjoin on the two specified PTables.
+   */
+  def join[K : PTypeH, V1 : PTypeH, V2 : PTypeH](t1: PTable[K, V1], t2: PTable[K, V2])
+      : PTable[K, (V1, V2)] = {
+    t1.join(t2)
+  }
+
+  /**
+   * Unions the specified PCollections.
+   */
+  def union[T](first: PCollection[T], others: PCollection[T]*)
+      : PCollection[T] = {
+    first.union(others: _*)
+  }
+
+  /**
+   * Unions the specified PTables.
+   */
+  def union[K, V](first: PTable[K, V], others: PTable[K, V]*)
+      : PTable[K, V] = {
+    first.union(others: _*)
+  }
+}
+
+/**
+ * Companion object containing convenience methods for building pipelines.
+ */
+object PipelineHelper extends PipelineHelper

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala b/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
new file mode 100644
index 0000000..4062fdc
--- /dev/null
+++ b/scrunch/src/main/scala/org/apache/scrunch/PipelineLike.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.crunch.{Pipeline => JPipeline}
+import org.apache.crunch.Source
+import org.apache.crunch.TableSource
+import org.apache.crunch.Target
+
+trait PipelineLike {
+  def jpipeline: JPipeline
+
+  /**
+   * Gets the configuration object associated with this pipeline.
+   */
+  def getConfiguration(): Configuration = jpipeline.getConfiguration()
+
+  /**
+   * Reads a source into a [[org.apache.scrunch.PCollection]]
+   *
+   * @param source The source to read from.
+   * @tparam T The type of the values being read.
+   * @return A PCollection containing data read from the specified source.
+   */
+  def read[T](source: Source[T]): PCollection[T] = new PCollection(jpipeline.read(source))
+
+  /**
+   * Reads a source into a [[org.apache.scrunch.PTable]]
+   *
+   * @param source The source to read from.
+   * @tparam K The type of the keys being read.
+   * @tparam V The type of the values being read.
+   * @return A PCollection containing data read from the specified source.
+   */
+  def read[K, V](source: TableSource[K, V]): PTable[K, V] = new PTable(jpipeline.read(source))
+
+  /**
+   * Writes a parallel collection to a target.
+   *
+   * @param collection The collection to write.
+   * @param target The destination target for this write.
+   */
+  def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target)
+
+  /**
+   * Writes a parallel table to a target.
+   *
+   * @param table The table to write.
+   * @param target The destination target for this write.
+   */
+  def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target)
+
+  /**
+   * Constructs and executes a series of MapReduce jobs in order
+   * to write data to the output targets.
+   */
+  def run(): Unit = jpipeline.run()
+
+  /**
+   * Run any remaining jobs required to generate outputs and then
+   * clean up any intermediate data files that were created in
+   * this run or previous calls to `run`.
+   */
+  def done(): Unit = jpipeline.done()
+
+  /**
+   * Turn on debug logging for jobs that are run from this pipeline.
+   */
+  def debug(): Unit = jpipeline.enableDebug()
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/CogroupTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/CogroupTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/CogroupTest.scala
deleted file mode 100644
index f25f126..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/CogroupTest.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import com.cloudera.crunch.io.{From => from}
-import com.cloudera.crunch.test.FileHelper
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class CogroupTest extends JUnitSuite {
-  val pipeline = Pipeline.mapReduce[CogroupTest]
-
-  def wordCount(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-        .flatMap(_.toLowerCase.split("\\W+")).count
-  }
-
-  @Test def cogroup {
-    val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
-    val maugham = FileHelper.createTempCopyOf("maugham.txt")
-    val diffs = wordCount(shakespeare).cogroup(wordCount(maugham))
-        .map((k, v) => (k, (v._1.sum - v._2.sum))).materialize
-    assert(diffs.exists(_ == ("the", -11390)))
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/JoinTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/JoinTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/JoinTest.scala
deleted file mode 100644
index b49b016..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/JoinTest.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import com.cloudera.crunch.io.{From => from, To => to}
-import com.cloudera.crunch.test.FileHelper
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class JoinTest extends JUnitSuite {
-  val pipeline = Pipeline.mapReduce[CogroupTest]
-
-  def wordCount(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-        .flatMap(_.toLowerCase.split("\\W+")).count
-  }
-
-  @Test def join {
-    val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
-    val maugham = FileHelper.createTempCopyOf("maugham.txt")
-    val output = FileHelper.createOutputPath()
-    output.deleteOnExit()
-    val filtered = wordCount(shakespeare).join(wordCount(maugham))
-        .map((k, v) => (k, v._1 - v._2))
-        .write(to.textFile(output.getAbsolutePath()))
-        .filter((k, d) => d > 0).materialize
-    assert(filtered.exists(_ == ("macbeth", 66)))
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/PageRankClassTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/PageRankClassTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/PageRankClassTest.scala
deleted file mode 100644
index 1735e09..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/PageRankClassTest.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import Avros._
-
-import com.cloudera.crunch.{DoFn, Emitter, Pair => P}
-import com.cloudera.crunch.io.{From => from}
-import com.cloudera.crunch.test.FileHelper
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Assert._
-import _root_.org.junit.Test
-
-case class PageRankData(pr: Float, oldpr: Float, urls: Array[String]) {
-  def this() = this(0f, 0f, null)
-
-  def scaledPageRank = pr / urls.length
-
-  def next(newPageRank: Float) = new PageRankData(newPageRank, pr, urls)
-
-  def delta = math.abs(pr - oldpr)
-}
-
-class CachingPageRankClassFn extends DoFn[P[String, PageRankData], P[String, Float]] {
-  val cache = new HashMap[String, Float] {
-    override def default(key: String) = 0f
-  }
-
-  override def process(input: P[String, PageRankData], emitFn: Emitter[P[String, Float]]) {
-    val prd = input.second()
-    if (prd.urls.length > 0) {
-      val newpr = prd.pr / prd.urls.length
-      prd.urls.foreach(url => cache.put(url, cache(url) + newpr))
-      if (cache.size > 5000) {
-        cleanup(emitFn)
-      }
-    }
-  }
-
-  override def cleanup(emitFn: Emitter[P[String, Float]]) {
-    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
-    cache.clear
-  }
-}
-
-class PageRankClassTest extends JUnitSuite {
-  val pipeline = Pipeline.mapReduce[PageRankTest]
-
-  def initialInput(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
-      .groupByKey
-      .map((url, links) => (url, PageRankData(1f, 0f, links.filter(x => x != null).toArray)))
-  }
-
-  def update(prev: PTable[String, PageRankData], d: Float) = {
-    val outbound = prev.flatMap((url, prd) => {
-      prd.urls.map(link => (link, prd.scaledPageRank))
-    })
-    cg(prev, outbound, d)
-  }
-
-  def cg(prev: PTable[String, PageRankData],
-         out: PTable[String, Float], d: Float) = {
-    prev.cogroup(out).map((url, v) => {
-      val (p, o) = v
-      val prd = p.head
-      (url, prd.next((1 - d) + d * o.sum))
-    })
-  }
-
-  def fastUpdate(prev: PTable[String, PageRankData], d: Float) = {
-    val outbound = prev.parallelDo(new CachingPageRankClassFn(), tableOf(strings, floats))
-    cg(prev, outbound, d)
-  }
-
-  @Test def testPageRank {
-    pipeline.getConfiguration.set("crunch.debug", "true")
-    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = update(prev, 0.5f)
-      delta = prev.values.map(_.delta).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-
-  def testFastPageRank {
-    pipeline.getConfiguration.set("crunch.debug", "true")
-    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = fastUpdate(prev, 0.5f)
-      delta = prev.values.map(_.delta).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/PageRankTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/PageRankTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/PageRankTest.scala
deleted file mode 100644
index ba9b55c..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/PageRankTest.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import Avros._
-
-import com.cloudera.crunch.{DoFn, Emitter, Pair => P}
-import com.cloudera.crunch.io.{From => from}
-import com.cloudera.crunch.test.FileHelper
-
-import scala.collection.mutable.HashMap
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Assert._
-import _root_.org.junit.Test
-
-class CachingPageRankFn extends DoFn[P[String, (Float, Float, List[String])], P[String, Float]] {
-  val cache = new HashMap[String, Float] {
-    override def default(key: String) = 0f
-  }
-
-  override def process(input: P[String, (Float, Float, List[String])], emitFn: Emitter[P[String, Float]]) {
-    val (pr, oldpr, urls) = input.second()
-    val newpr = pr / urls.size
-    urls.foreach(url => cache.put(url, cache(url) + newpr))
-    if (cache.size > 5000) {
-      cleanup(emitFn)
-    }
-  }
-
-  override def cleanup(emitFn: Emitter[P[String, Float]]) {
-    cache.foreach(kv => emitFn.emit(P.of(kv._1, kv._2)))
-    cache.clear
-  }
-}
-
-class PageRankTest extends JUnitSuite {
-  val pipeline = Pipeline.mapReduce[PageRankTest]
-
-  def initialInput(fileName: String) = {
-    pipeline.read(from.textFile(fileName))
-      .map(line => { val urls = line.split("\\t"); (urls(0), urls(1)) })
-      .groupByKey
-      .map((url, links) => (url, (1f, 0f, links.toList)))
-  }
-
-  def update(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
-    val outbound = prev.flatMap((url, v) => {
-      val (pr, oldpr, links) = v
-      links.map(link => (link, pr / links.size))
-    })
-    cg(prev, outbound, d)
-  }
-
-  def cg(prev: PTable[String, (Float, Float, List[String])],
-         out: PTable[String, Float], d: Float) = {
-    prev.cogroup(out).map((url, v) => {
-      val (p, o) = v
-      val (pr, oldpr, links) = p.head
-      (url, ((1 - d) + d * o.sum, pr, links))
-    })
-  }
-
-  def fastUpdate(prev: PTable[String, (Float, Float, List[String])], d: Float) = {
-    val outbound = prev.parallelDo(new CachingPageRankFn(), tableOf(strings, floats))
-    cg(prev, outbound, d)
-  }
-
-  @Test def testPageRank {
-    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = update(prev, 0.5f)
-      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-
-  @Test def testFastPageRank {
-    var prev = initialInput(FileHelper.createTempCopyOf("urls.txt"))
-    var delta = 1.0f
-    while (delta > 0.01f) {
-      prev = fastUpdate(prev, 0.5f)
-      delta = prev.map((k, v) => math.abs(v._1 - v._2)).max.materialize.head
-    }
-    assertEquals(0.0048, delta, 0.001)
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/PipelineAppTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/PipelineAppTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/PipelineAppTest.scala
deleted file mode 100644
index 2b4ff43..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/PipelineAppTest.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-
-import com.cloudera.crunch.test.FileHelper
-import com.cloudera.scrunch.PipelineApp
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-object WordCount extends PipelineApp {
-
-  def wordSplit(line: String) = line.split("\\W+").filter(!_.isEmpty())
-
-  def countWords(filename: String) = {
-    val lines = read(from.textFile(filename))
-    val words = lines.flatMap(wordSplit)
-    words.count
-  }
-
-  val w1 = countWords(args(0))
-  val w2 = countWords(args(1))
-  cogroup(w1, w2).write(to.textFile(args(2)))
-}
-
-class PipelineAppTest extends JUnitSuite {
-  @Test def run {
-    val args = new Array[String](3)
-    args(0) = FileHelper.createTempCopyOf("shakes.txt")
-    args(1) = FileHelper.createTempCopyOf("maugham.txt")
-    args(2) = FileHelper.createOutputPath.getAbsolutePath
-    WordCount.main(args)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala
deleted file mode 100644
index 738afa9..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/TopTest.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import com.cloudera.crunch.io.{From => from, To => to}
-import com.cloudera.crunch.test.FileHelper
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class TopTest extends JUnitSuite {
-
-  @Test def topInMem {
-    val ptable = Mem.tableOf(("foo", 17), ("bar", 29), ("baz", 1729))
-    assert(ptable.top(1, true).materialize.head == ("baz", 1729))
-  }
-
-  @Test def top2 {
-    val pipeline = Pipeline.mapReduce[TopTest]
-    val input = FileHelper.createTempCopyOf("shakes.txt")
-
-    val wc = pipeline.read(from.textFile(input))
-        .flatMap(_.toLowerCase.split("\\s+"))
-        .filter(!_.isEmpty()).count
-    assert(wc.top(10, true).materialize.exists(_ == ("is", 205)))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/UnionTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/UnionTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/UnionTest.scala
deleted file mode 100644
index 6599a2f..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/UnionTest.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import com.cloudera.crunch.io.{From => from}
-import com.cloudera.crunch.test.FileHelper
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class UnionTest extends JUnitSuite {
-  val pipeline = Pipeline.mapReduce[UnionTest]
-  val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
-  val maugham = FileHelper.createTempCopyOf("maugham.txt")
-
-  def wordCount(col: PCollection[String]) = {
-    col.flatMap(_.toLowerCase.split("\\W+")).count
-  }
-
-  @Test def testUnionCollection {
-    val union = pipeline.read(from.textFile(shakespeare)).union(
-        pipeline.read(from.textFile(maugham)))
-    val wc = wordCount(union).materialize
-    assert(wc.exists(_ == ("you", 3691)))
-    pipeline.done
-  }
-
-  @Test def testUnionTable {
-    val wcs = wordCount(pipeline.read(from.textFile(shakespeare)))
-    val wcm = wordCount(pipeline.read(from.textFile(maugham)))
-    val wc = wcs.union(wcm).groupByKey.combine(v => v.sum).materialize
-    assert(wc.exists(_ == ("you", 3691)))
-    pipeline.done
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/com/cloudera/scrunch/WordCountTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/com/cloudera/scrunch/WordCountTest.scala b/scrunch/src/test/scala/com/cloudera/scrunch/WordCountTest.scala
deleted file mode 100644
index b6a76f0..0000000
--- a/scrunch/src/test/scala/com/cloudera/scrunch/WordCountTest.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (c) 2011, Cloudera, Inc. All Rights Reserved.
- *
- * Cloudera, Inc. licenses this file to you under the Apache License,
- * Version 2.0 (the "License"). You may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
- * CONDITIONS OF ANY KIND, either express or implied. See the License for
- * the specific language governing permissions and limitations under the
- * License.
- */
-package com.cloudera.scrunch
-
-import com.cloudera.crunch.io.{From => from, To => to}
-import com.cloudera.crunch.test.FileHelper
-
-import java.io.File
-
-import org.scalatest.junit.JUnitSuite
-import _root_.org.junit.Test
-
-class WordCountTest extends JUnitSuite {
-  @Test def wordCount {
-    val pipeline = Pipeline.mapReduce[WordCountTest]
-    val input = FileHelper.createTempCopyOf("shakes.txt")
-    val wordCountOut = FileHelper.createOutputPath
-
-    val fcc = pipeline.read(from.textFile(input))
-        .flatMap(_.toLowerCase.split("\\s+"))
-        .filter(!_.isEmpty()).count
-        .write(to.textFile(wordCountOut.getAbsolutePath)) // Word counts
-        .map((w, c) => (w.slice(0, 1), c))
-        .groupByKey.combine(v => v.sum).materialize
-    assert(fcc.exists(_ == ("w", 1404)))
-
-    pipeline.done
-    wordCountOut.delete()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/540345ca/scrunch/src/test/scala/org/apache/scrunch/CogroupTest.scala
----------------------------------------------------------------------
diff --git a/scrunch/src/test/scala/org/apache/scrunch/CogroupTest.scala b/scrunch/src/test/scala/org/apache/scrunch/CogroupTest.scala
new file mode 100644
index 0000000..b0e94a1
--- /dev/null
+++ b/scrunch/src/test/scala/org/apache/scrunch/CogroupTest.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.scrunch
+
+import org.apache.crunch.io.{From => from}
+import org.apache.crunch.test.FileHelper
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.Test
+
+class CogroupTest extends JUnitSuite {
+  val pipeline = Pipeline.mapReduce[CogroupTest]
+
+  def wordCount(fileName: String) = {
+    pipeline.read(from.textFile(fileName))
+        .flatMap(_.toLowerCase.split("\\W+")).count
+  }
+
+  @Test def cogroup {
+    val shakespeare = FileHelper.createTempCopyOf("shakes.txt")
+    val maugham = FileHelper.createTempCopyOf("maugham.txt")
+    val diffs = wordCount(shakespeare).cogroup(wordCount(maugham))
+        .map((k, v) => (k, (v._1.sum - v._2.sum))).materialize
+    assert(diffs.exists(_ == ("the", -11390)))
+    pipeline.done
+  }
+}


Mime
View raw message